Skip to main content

mz_transform/
dataflow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22    AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42    target = "optimizer",
43    level = "debug",
44    fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47    dataflow: &mut DataflowDesc,
48    transform_ctx: &mut TransformCtx,
49    fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51    // Inline views that are used in only one other view.
52    inline_views(dataflow)?;
53
54    if fast_path_optimizer {
55        optimize_dataflow_relations(
56            dataflow,
57            &Optimizer::fast_path_optimizer(transform_ctx),
58            transform_ctx,
59        )?;
60    } else {
61        // Logical optimization pass after view inlining
62        optimize_dataflow_relations(
63            dataflow,
64            #[allow(deprecated)]
65            &Optimizer::logical_optimizer(transform_ctx),
66            transform_ctx,
67        )?;
68
69        optimize_dataflow_filters(dataflow)?;
70        // TODO: when the linear operator contract ensures that propagated
71        // predicates are always applied, projections and filters can be removed
72        // from where they come from. Once projections and filters can be removed,
73        // TODO: it would be useful for demand to be optimized after filters
74        // that way demand only includes the columns that are still necessary after
75        // the filters are applied.
76        optimize_dataflow_demand(dataflow)?;
77
78        // A smaller logical optimization pass after projections and filters are
79        // pushed down across views.
80        optimize_dataflow_relations(
81            dataflow,
82            &Optimizer::logical_cleanup_pass(transform_ctx, false),
83            transform_ctx,
84        )?;
85
86        // Physical optimization pass
87        optimize_dataflow_relations(
88            dataflow,
89            &Optimizer::physical_optimizer(transform_ctx),
90            transform_ctx,
91        )?;
92
93        optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94    }
95
96    prune_and_annotate_dataflow_index_imports(
97        dataflow,
98        transform_ctx.indexes,
99        transform_ctx.df_meta,
100    )?;
101
102    // Warning: If you want to add a transform call here, consider it very carefully whether it
103    // could accidentally invalidate information that we already derived above in
104    // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
105
106    mz_repr::explain::trace_plan(dataflow);
107
108    Ok(())
109}
110
111/// Inline views used in one other view, and in no exported objects.
112#[mz_ore::instrument(
113    target = "optimizer",
114    level = "debug",
115    fields(path.segment = "inline_views")
116)]
117fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
118    // We cannot inline anything whose `BuildDesc::id` appears in either the
119    // `index_exports` or `sink_exports` of `dataflow`, because we lose our
120    // ability to name it.
121
122    // A view can / should be in-lined in another view if it is only used by
123    // one subsequent view. If there are two distinct views that have not
124    // themselves been merged, then too bad and it doesn't get inlined.
125
126    // Starting from the *last* object to build, walk backwards and inline
127    // any view that is neither referenced by a `index_exports` nor
128    // `sink_exports` nor more than two remaining objects to build.
129
130    for index in (0..dataflow.objects_to_build.len()).rev() {
131        // Capture the name used by others to reference this view.
132        let global_id = dataflow.objects_to_build[index].id;
133        // Determine if any exports directly reference this view.
134        let mut occurs_in_export = false;
135        for (_gid, sink_desc) in dataflow.sink_exports.iter() {
136            if sink_desc.from == global_id {
137                occurs_in_export = true;
138            }
139        }
140        for (_, (index_desc, _)) in dataflow.index_exports.iter() {
141            if index_desc.on_id == global_id {
142                occurs_in_export = true;
143            }
144        }
145        // Count the number of subsequent views that reference this view.
146        let mut occurrences_in_later_views = Vec::new();
147        for other in (index + 1)..dataflow.objects_to_build.len() {
148            if dataflow.objects_to_build[other]
149                .plan
150                .depends_on()
151                .contains(&global_id)
152            {
153                occurrences_in_later_views.push(other);
154            }
155        }
156        // Inline if the view is referenced in one view and no exports.
157        if !occurs_in_export && occurrences_in_later_views.len() == 1 {
158            let other = occurrences_in_later_views[0];
159            // We can remove this view and insert it in the later view,
160            // but are not able to relocate the later view `other`.
161
162            // When splicing in the `index` view, we need to create disjoint
163            // identifiers for the Let's `body` and `value`, as well as a new
164            // identifier for the binding itself. Following `NormalizeLets`, we
165            // go with the binding first, then the value, then the body.
166            let mut id_gen = crate::IdGen::default();
167            let new_local = LocalId::new(id_gen.allocate_id());
168            // Use the same `id_gen` to assign new identifiers to `index`.
169            crate::normalize_lets::renumber_bindings(
170                dataflow.objects_to_build[index].plan.as_inner_mut(),
171                &mut id_gen,
172            )?;
173            // Assign new identifiers to the other relation.
174            crate::normalize_lets::renumber_bindings(
175                dataflow.objects_to_build[other].plan.as_inner_mut(),
176                &mut id_gen,
177            )?;
178            // Install the `new_local` name wherever `global_id` was used.
179            dataflow.objects_to_build[other]
180                .plan
181                .as_inner_mut()
182                .visit_pre_mut(|expr| {
183                    if let MirRelationExpr::Get { id, .. } = expr {
184                        if id == &Id::Global(global_id) {
185                            *id = Id::Local(new_local);
186                        }
187                    }
188                });
189
190            // With identifiers rewritten, we can replace `other` with
191            // a `MirRelationExpr::Let` binding, whose value is `index` and
192            // whose body is `other`.
193            let body = dataflow.objects_to_build[other]
194                .plan
195                .as_inner_mut()
196                .take_dangerous();
197            let value = dataflow.objects_to_build[index]
198                .plan
199                .as_inner_mut()
200                .take_dangerous();
201            *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
202                id: new_local,
203                value: Box::new(value),
204                body: Box::new(body),
205            };
206            dataflow.objects_to_build.remove(index);
207        }
208    }
209
210    mz_repr::explain::trace_plan(dataflow);
211
212    Ok(())
213}
214
215/// Performs either the logical or the physical optimization pass on the
216/// dataflow using the supplied set of indexes.
217#[mz_ore::instrument(
218    target = "optimizer",
219    level = "debug",
220    fields(path.segment = optimizer.name)
221)]
222fn optimize_dataflow_relations(
223    dataflow: &mut DataflowDesc,
224    optimizer: &Optimizer,
225    ctx: &mut TransformCtx,
226) -> Result<(), TransformError> {
227    // Re-optimize each dataflow
228    for object in dataflow.objects_to_build.iter_mut() {
229        // Re-run all optimizations on the composite views.
230        ctx.set_global_id(object.id);
231        optimizer.transform(object.plan.as_inner_mut(), ctx)?;
232        ctx.reset_global_id();
233    }
234
235    mz_repr::explain::trace_plan(dataflow);
236
237    Ok(())
238}
239
240/// Pushes demand information from published outputs to dataflow inputs,
241/// projecting away unnecessary columns.
242///
243/// Dataflows that exist for the sake of generating plan explanations do not
244/// have published outputs. In this case, we push demand information from views
245/// not depended on by other views to dataflow inputs.
246#[mz_ore::instrument(
247    target = "optimizer",
248    level = "debug",
249    fields(path.segment ="demand")
250)]
251fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
252    // Maps id -> union of known columns demanded from the source/view with the
253    // corresponding id.
254    let mut demand = BTreeMap::new();
255
256    if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
257        // In the absence of any exports, just demand all columns from views
258        // that are not depended on by another view, which is currently the last
259        // object in `objects_to_build`.
260
261        // A DataflowDesc without exports is currently created in the context of
262        // EXPLAIN outputs. This ensures that the output has all the columns of
263        // the original explainee.
264        if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
265            demand
266                .entry(Id::Global(build_desc.id))
267                .or_insert_with(BTreeSet::new)
268                .extend(0..build_desc.plan.as_inner_mut().arity());
269        }
270    } else {
271        // Demand all columns of inputs to sinks.
272        for (_id, sink) in dataflow.sink_exports.iter() {
273            let input_id = sink.from;
274            demand
275                .entry(Id::Global(input_id))
276                .or_insert_with(BTreeSet::new)
277                .extend(0..dataflow.arity_of(&input_id));
278        }
279
280        // Demand all columns of inputs to exported indexes.
281        for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
282            let input_id = desc.on_id;
283            demand
284                .entry(Id::Global(input_id))
285                .or_insert_with(BTreeSet::new)
286                .extend(0..dataflow.arity_of(&input_id));
287        }
288    }
289
290    optimize_dataflow_demand_inner(
291        dataflow
292            .objects_to_build
293            .iter_mut()
294            .rev()
295            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
296        &mut demand,
297    )?;
298
299    mz_repr::explain::trace_plan(dataflow);
300
301    Ok(())
302}
303
304/// Pushes demand through views in `view_sequence` in order, removing
305/// columns not demanded.
306///
307/// This method is made public for the sake of testing.
308/// TODO: make this private once we allow multiple exports per dataflow.
309pub fn optimize_dataflow_demand_inner<'a, I>(
310    view_sequence: I,
311    demand: &mut BTreeMap<Id, BTreeSet<usize>>,
312) -> Result<(), TransformError>
313where
314    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
315{
316    // Maps id -> The projection that was pushed down on the view with the
317    // corresponding id.
318    let mut applied_projection = BTreeMap::new();
319    // Collect the mutable references to views after pushing projection down
320    // in order to run cleanup actions on them in a second loop.
321    let mut view_refs = Vec::new();
322    let projection_pushdown = crate::movement::ProjectionPushdown::default();
323    for (id, view) in view_sequence {
324        if let Some(columns) = demand.get(&id) {
325            let projection_pushed_down = columns.iter().map(|c| *c).collect();
326            // Push down the projection consisting of the entries of `columns`
327            // in increasing order.
328            projection_pushdown.action(view, &projection_pushed_down, demand)?;
329            let new_type = view.typ();
330            applied_projection.insert(id, (projection_pushed_down, new_type));
331        }
332        view_refs.push(view);
333    }
334
335    for view in view_refs {
336        // Update `Get` nodes to reflect any columns that have been projected away.
337        projection_pushdown.update_projection_around_get(view, &applied_projection);
338    }
339
340    Ok(())
341}
342
343/// Pushes predicate to dataflow inputs.
344#[mz_ore::instrument(
345    target = "optimizer",
346    level = "debug",
347    fields(path.segment ="filters")
348)]
349fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
350    // Contains id -> predicates map, describing those predicates that
351    // can (but need not) be applied to the collection named by `id`.
352    let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
353
354    // Propagate predicate information from outputs to inputs.
355    optimize_dataflow_filters_inner(
356        dataflow
357            .objects_to_build
358            .iter_mut()
359            .rev()
360            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
361        &mut predicates,
362    )?;
363
364    // Push predicate information into the SourceDesc.
365    for (source_id, source_import) in dataflow.source_imports.iter_mut() {
366        let source = &mut source_import.desc;
367        if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
368            if !list.is_empty() {
369                // Canonicalize the order of predicates, for stable plans.
370                let mut list = list.into_iter().collect::<Vec<_>>();
371                list.sort();
372                // Install no-op predicate information if none exists.
373                if source.arguments.operators.is_none() {
374                    source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
375                }
376                // Add any predicates that can be pushed to the source.
377                if let Some(operator) = source.arguments.operators.take() {
378                    source.arguments.operators = Some(operator.filter(list));
379                    source.arguments.operators.as_mut().map(|x| x.optimize());
380                }
381            }
382        }
383    }
384
385    mz_repr::explain::trace_plan(dataflow);
386
387    Ok(())
388}
389
390/// Pushes filters down through views in `view_sequence` in order.
391///
392/// This method is made public for the sake of testing.
393/// TODO: make this private once we allow multiple exports per dataflow.
394pub fn optimize_dataflow_filters_inner<'a, I>(
395    view_iter: I,
396    predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
397) -> Result<(), TransformError>
398where
399    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
400{
401    let transform = crate::predicate_pushdown::PredicatePushdown::default();
402    for (id, view) in view_iter {
403        if let Some(list) = predicates.get(&id).clone() {
404            if !list.is_empty() {
405                *view = view.take_dangerous().filter(list.iter().cloned());
406            }
407        }
408        transform.action(view, predicates)?;
409    }
410    Ok(())
411}
412
413/// Propagates information about monotonic inputs through operators,
414/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
415#[mz_ore::instrument(
416    target = "optimizer",
417    level = "debug",
418    fields(path.segment ="monotonic")
419)]
420pub fn optimize_dataflow_monotonic(
421    dataflow: &mut DataflowDesc,
422    ctx: &mut TransformCtx,
423) -> Result<(), TransformError> {
424    let mut monotonic_ids = BTreeSet::new();
425    for (source_id, source_import) in dataflow.source_imports.iter() {
426        if source_import.monotonic {
427            monotonic_ids.insert(source_id.clone());
428        }
429    }
430    for (
431        _index_id,
432        IndexImport {
433            desc: index_desc,
434            monotonic,
435            ..
436        },
437    ) in dataflow.index_imports.iter()
438    {
439        if *monotonic {
440            monotonic_ids.insert(index_desc.on_id.clone());
441        }
442    }
443
444    let monotonic_flag = MonotonicFlag::default();
445
446    for build_desc in dataflow.objects_to_build.iter_mut() {
447        monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
448    }
449
450    mz_repr::explain::trace_plan(dataflow);
451
452    Ok(())
453}
454
455/// Determine whether we require snapshots from our durable source imports.
456/// (For example, these can often be skipped for simple subscribe queries.)
457pub fn optimize_dataflow_snapshot(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
458    // For every global id, true iff we need a snapshot for that global ID.
459    // This is computed bottom-up: subscribes may or may not require a snapshot from their inputs,
460    // index exports definitely do, and objects-to-build require a snapshot from their inputs if
461    // either they need to provide a snapshot as output or they may need snapshots internally, eg. to
462    // compute a join.
463    let mut downstream_requires_snapshot = BTreeMap::new();
464
465    for (_id, export) in &dataflow.sink_exports {
466        *downstream_requires_snapshot
467            .entry(Id::Global(export.from))
468            .or_default() |= export.with_snapshot;
469    }
470    for (_id, (export, _typ)) in &dataflow.index_exports {
471        *downstream_requires_snapshot
472            .entry(Id::Global(export.on_id))
473            .or_default() |= true;
474    }
475    for BuildDesc { id: _, plan } in dataflow.objects_to_build.iter().rev() {
476        // For now, we treat all intermediate nodes as potentially requiring a snapshot.
477        // Walk the AST, marking anything depended on by a compute object as snapshot-required.
478        let mut todo = vec![(true, &plan.0)];
479        while let Some((requires_snapshot, expr)) = todo.pop() {
480            match expr {
481                MirRelationExpr::Get { id, .. } => {
482                    *downstream_requires_snapshot.entry(*id).or_default() |= requires_snapshot;
483                }
484                other => {
485                    todo.extend(other.children().rev().map(|c| (true, c)));
486                }
487            }
488        }
489    }
490    for (id, import) in &mut dataflow.source_imports {
491        let with_snapshot = downstream_requires_snapshot
492            .entry(Id::Global(*id))
493            .or_default();
494
495        // As above, fetch the snapshot if there are any transformations on the raw source data.
496        // (And we'll always need to check for things like temporal filters, since those allow
497        // snapshot data to affect diffs at times past the as-of.)
498        *with_snapshot |= import.desc.arguments.operators.is_some();
499
500        import.with_snapshot = *with_snapshot;
501    }
502    for (_id, import) in &mut dataflow.index_imports {
503        let with_snapshot = downstream_requires_snapshot
504            .entry(Id::Global(import.desc.on_id))
505            .or_default();
506
507        import.with_snapshot = *with_snapshot;
508    }
509
510    Ok(())
511}
512
513/// Restricts the indexes imported by `dataflow` to only the ones it needs.
514/// It also adds to the `DataflowMetainfo` how each index will be used.
515/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
516/// their index usage types.
517///
518/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
519/// references.
520///
521/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
522/// `ArrangeBy`s at the top of unused Let bindings.
523#[mz_ore::instrument(
524    target = "optimizer",
525    level = "debug",
526    fields(path.segment = "index_imports")
527)]
528fn prune_and_annotate_dataflow_index_imports(
529    dataflow: &mut DataflowDesc,
530    indexes: &dyn IndexOracle,
531    dataflow_metainfo: &mut DataflowMetainfo,
532) -> Result<(), TransformError> {
533    // Preparation.
534    // Let's save the unique keys of the sources. This will inform which indexes to choose for full
535    // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
536    // sources that are not getting an indexed read.)
537    let mut source_keys = BTreeMap::new();
538    for build_desc in dataflow.objects_to_build.iter() {
539        build_desc
540            .plan
541            .as_inner()
542            .visit_pre(|expr: &MirRelationExpr| match expr {
543                MirRelationExpr::Get {
544                    id: Id::Global(global_id),
545                    typ,
546                    ..
547                } => {
548                    source_keys.entry(*global_id).or_insert_with(|| {
549                        typ.keys
550                            .iter()
551                            .map(|key| {
552                                key.iter()
553                                    // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
554                                    // later we can more easily compare index keys to these keys.
555                                    .map(|col_idx| MirScalarExpr::column(*col_idx))
556                                    .collect()
557                            })
558                            .collect()
559                    });
560                }
561                _ => {}
562            });
563    }
564
565    // This will be a mapping of
566    // (ids used by exports and objects to build) ->
567    // (arrangement keys and usage types on that id that have been requested)
568    let mut index_reqs_by_id = BTreeMap::new();
569
570    // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
571    // for which we also have an available index.
572    for build_desc in dataflow.objects_to_build.iter_mut() {
573        CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
574            .collect_index_reqs(build_desc.plan.as_inner_mut())?;
575    }
576
577    // Collect index usages by `sink_exports`.
578    // A sink export sometimes wants to directly use an imported index. I know of one case where
579    // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
580    // `objects_to_build`, but will want to directly read from the index and write to a sink.
581    for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
582        // First, let's see if there exists an index on the id that the sink wants. If not, there is
583        // nothing we can do here.
584        if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
585            // If yes, then we'll add a request of _some_ index: If we already collected an index
586            // request on this id, then use that, otherwise use the above arbitrarily picked index.
587            let requested_idxs = index_reqs_by_id
588                .entry(sink_desc.from)
589                .or_insert_with(Vec::new);
590            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
591                requested_idxs.push((
592                    *already_req_idx_id,
593                    already_req_key.clone(),
594                    IndexUsageType::SinkExport,
595                ));
596            } else {
597                requested_idxs.push((
598                    idx_id,
599                    arbitrary_idx_key.to_owned(),
600                    IndexUsageType::SinkExport,
601                ));
602            }
603        }
604    }
605
606    // Collect index usages by `index_exports`.
607    for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
608        // First, let's see if there exists an index on the id that the exported index is on. If
609        // not, there is nothing we can do here.
610        if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
611            // If yes, then we'll add an index request of some index: If we already collected an
612            // index request on this id, then use that, otherwise use the above arbitrarily picked
613            // index.
614            let requested_idxs = index_reqs_by_id
615                .entry(index_desc.on_id)
616                .or_insert_with(Vec::new);
617            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
618                requested_idxs.push((
619                    *already_req_idx_id,
620                    already_req_key.clone(),
621                    IndexUsageType::IndexExport,
622                ));
623            } else {
624                // This is surprising: Actually, an index creation dataflow always has a plan in
625                // `objects_to_build` that will have a Get of the object that the index is on (see
626                // `DataflowDescription::export_index`). Therefore, we should have already requested
627                // an index usage when seeing that Get in `CollectIndexRequests`.
628                soft_panic_or_log!(
629                    "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
630                );
631                requested_idxs.push((
632                    idx_id,
633                    arbitrary_index_key.to_owned(),
634                    IndexUsageType::IndexExport,
635                ));
636            }
637        }
638    }
639
640    // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
641    // Moreover, for each of these ids, if any index exists on it, then we should have already
642    // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
643    // Unknown usage, picking an arbitrary index.
644    for (id, index_reqs) in index_reqs_by_id.iter_mut() {
645        if index_reqs.is_empty() {
646            // Try to pick an arbitrary index to be fully scanned.
647            if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
648                soft_panic_or_log!(
649                    "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
650id: {}, key: {:?}",
651                    id,
652                    key
653                );
654                index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
655            }
656        }
657    }
658
659    // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
660    // request on the same id.
661    // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
662    // that if a Get has that id, then any full scan index accesses on it should be changed to use
663    // the indicated index id.
664    let mut full_scan_changes = BTreeMap::new();
665    for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
666        // Let's choose a non-FullScan access (if exists).
667        if let Some((picked_idx, picked_idx_key)) = choose_index(
668            &source_keys,
669            get_id,
670            &index_reqs
671                .iter()
672                .filter_map(|(idx_id, key, usage_type)| match usage_type {
673                    IndexUsageType::FullScan => None,
674                    _ => Some((*idx_id, key.clone())),
675                })
676                .collect_vec(),
677        ) {
678            // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
679            for (idx_id, key, usage_type) in index_reqs {
680                match usage_type {
681                    IndexUsageType::FullScan => {
682                        full_scan_changes.insert(get_id, picked_idx);
683                        *idx_id = picked_idx;
684                        key.clone_from(&picked_idx_key);
685                    }
686                    _ => {}
687                }
688            }
689        }
690    }
691    // Apply the above full scan changes to also the Gets.
692    for build_desc in dataflow.objects_to_build.iter_mut() {
693        build_desc
694            .plan
695            .as_inner_mut()
696            .visit_pre_mut(|expr: &mut MirRelationExpr| {
697                match expr {
698                    MirRelationExpr::Get {
699                        id: Id::Global(global_id),
700                        typ: _,
701                        access_strategy: persist_or_index,
702                    } => {
703                        if let Some(new_idx_id) = full_scan_changes.get(global_id) {
704                            match persist_or_index {
705                                AccessStrategy::UnknownOrLocal => {
706                                    // Should have been already filled by `collect_index_reqs`.
707                                    unreachable!()
708                                }
709                                AccessStrategy::Persist => {
710                                    // We already know that it's an indexed access.
711                                    unreachable!()
712                                }
713                                AccessStrategy::SameDataflow => {
714                                    // We have not added such annotations yet.
715                                    unreachable!()
716                                }
717                                AccessStrategy::Index(accesses) => {
718                                    for (idx_id, usage_type) in accesses {
719                                        if matches!(usage_type, IndexUsageType::FullScan) {
720                                            *idx_id = *new_idx_id;
721                                        }
722                                    }
723                                }
724                            }
725                        }
726                    }
727                    _ => {}
728                }
729            });
730    }
731
732    // Annotate index imports by their usage types
733    dataflow_metainfo.index_usage_types = BTreeMap::new();
734    for (
735        index_id,
736        IndexImport {
737            desc: index_desc,
738            typ: _,
739            monotonic: _,
740            with_snapshot: _,
741        },
742    ) in dataflow.index_imports.iter_mut()
743    {
744        // A sanity check that we are not importing an index that we are also exporting.
745        assert!(
746            !dataflow
747                .index_exports
748                .iter()
749                .map(|(exported_index_id, _)| exported_index_id)
750                .any(|exported_index_id| exported_index_id == index_id)
751        );
752
753        let mut new_usage_types = Vec::new();
754        // Let's see whether something has requested an index on this object that this imported
755        // index is on.
756        if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
757            for (req_idx_id, req_key, req_usage_type) in index_reqs {
758                if req_idx_id == index_id {
759                    soft_assert_eq_or_log!(*req_key, index_desc.key);
760                    new_usage_types.push(req_usage_type.clone());
761                }
762            }
763        }
764        if !new_usage_types.is_empty() {
765            dataflow_metainfo
766                .index_usage_types
767                .insert(*index_id, new_usage_types);
768        }
769    }
770
771    // Prune index imports to only those that are used
772    dataflow
773        .index_imports
774        .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
775
776    // Determine AccessStrategy::SameDataflow accesses. These were classified as
777    // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
778    // a collection that we are building ourselves, then we adjust the access strategy.
779    let mut objects_to_build_ids = BTreeSet::new();
780    for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
781        objects_to_build_ids.insert(id.clone());
782    }
783    for build_desc in dataflow.objects_to_build.iter_mut() {
784        build_desc
785            .plan
786            .as_inner_mut()
787            .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
788                MirRelationExpr::Get {
789                    id: Id::Global(global_id),
790                    typ: _,
791                    access_strategy,
792                } => match access_strategy {
793                    AccessStrategy::Persist => {
794                        if objects_to_build_ids.contains(global_id) {
795                            *access_strategy = AccessStrategy::SameDataflow;
796                        }
797                    }
798                    _ => {}
799                },
800                _ => {}
801            });
802    }
803
804    // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
805    for build_desc in dataflow.objects_to_build.iter() {
806        build_desc
807            .plan
808            .as_inner()
809            .visit_pre(|expr: &MirRelationExpr| match expr {
810                MirRelationExpr::Get {
811                    id: Id::Global(_),
812                    typ: _,
813                    access_strategy: AccessStrategy::Index(accesses),
814                } => {
815                    for (idx_id, _) in accesses {
816                        soft_assert_or_log!(
817                            dataflow.index_imports.contains_key(idx_id),
818                            "Dangling Get index annotation"
819                        );
820                    }
821                }
822                _ => {}
823            });
824    }
825
826    mz_repr::explain::trace_plan(dataflow);
827
828    Ok(())
829}
830
831/// Pick an index from a given Vec of index keys.
832///
833/// Currently, we pick as follows:
834///  - If there is an index on a unique key, then we pick that. (It might be better distributed, and
835///    is less likely to get dropped than other indexes.)
836///  - Otherwise, we pick an arbitrary index.
837///
838/// TODO: There are various edge cases where a better choice would be possible:
839/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
840///   capture this already.)
841/// - Some indexes might have an error, while others don't.
842///   <https://github.com/MaterializeInc/database-issues/issues/4455>
843/// - Some indexes might have more extra data in their keys (because of being on more complicated
844///   expressions than just column references), which won't be used in a full scan.
845fn choose_index(
846    source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
847    id: &GlobalId,
848    indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
849) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
850    match source_keys.get(id) {
851        None => indexes.iter().next().cloned(), // pick an arbitrary index
852        Some(coll_keys) => match indexes
853            .iter()
854            .find(|(_idx_id, key)| coll_keys.contains(&*key))
855        {
856            Some((idx_id, key)) => Some((*idx_id, key.clone())),
857            None => indexes.iter().next().cloned(), // pick an arbitrary index
858        },
859    }
860}
861
862#[derive(Debug)]
863struct CollectIndexRequests<'a> {
864    /// We were told about these unique keys on sources.
865    source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
866    /// We were told about these indexes being available.
867    indexes_available: &'a dyn IndexOracle,
868    /// We'll be collecting index requests here.
869    index_reqs_by_id:
870        &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
871    /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
872    /// current node (see docs on `IndexUsageContext` about what context we keep).
873    /// Moreover, we need to propagate this context from cte uses to cte definitions.
874    /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
875    /// added together.
876    context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
877    recursion_guard: RecursionGuard,
878}
879
880impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
881    fn recursion_guard(&self) -> &RecursionGuard {
882        &self.recursion_guard
883    }
884}
885
886impl<'a> CollectIndexRequests<'a> {
887    fn new(
888        source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
889        indexes_available: &'a dyn IndexOracle,
890        index_reqs_by_id: &'a mut BTreeMap<
891            GlobalId,
892            Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
893        >,
894    ) -> CollectIndexRequests<'a> {
895        CollectIndexRequests {
896            source_keys,
897            indexes_available,
898            index_reqs_by_id,
899            context_across_lets: BTreeMap::new(),
900            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
901        }
902    }
903
904    pub fn collect_index_reqs(
905        &mut self,
906        expr: &mut MirRelationExpr,
907    ) -> Result<(), RecursionLimitError> {
908        assert!(self.context_across_lets.is_empty());
909        self.collect_index_reqs_inner(
910            expr,
911            &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
912        )?;
913        assert!(self.context_across_lets.is_empty());
914        // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
915        for (_id, index_reqs) in self.index_reqs_by_id.iter() {
916            for (_, _, index_usage_type) in index_reqs {
917                soft_assert_or_log!(
918                    !matches!(
919                        index_usage_type,
920                        IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
921                    ),
922                    "Delta join Unknown index usage remained"
923                );
924            }
925        }
926        Ok(())
927    }
928
929    fn collect_index_reqs_inner(
930        &mut self,
931        expr: &mut MirRelationExpr,
932        contexts: &Vec<IndexUsageContext>,
933    ) -> Result<(), RecursionLimitError> {
934        self.checked_recur_mut(|this| {
935
936            // If an index exists on `on_id`, this function picks an index to be fully scanned.
937            let pick_index_for_full_scan = |on_id: &GlobalId| {
938                // Note that the choice we make here might be modified later at the
939                // "Adjust FullScans to not introduce a new index dependency".
940                choose_index(
941                    this.source_keys,
942                    on_id,
943                    &this.indexes_available.indexes_on(*on_id).map(
944                        |(idx_id, key)| (idx_id, key.iter().cloned().collect_vec())
945                    ).collect_vec()
946                )
947            };
948
949            // See comment on `IndexUsageContext`.
950            Ok(match expr {
951                MirRelationExpr::Join {
952                    inputs,
953                    implementation,
954                    ..
955                } => {
956                    match implementation {
957                        JoinImplementation::Differential(..) => {
958                            for input in inputs {
959                                this.collect_index_reqs_inner(
960                                    input,
961                                    &IndexUsageContext::from_usage_type(
962                                        IndexUsageType::DifferentialJoin,
963                                    ),
964                                )?;
965                            }
966                        }
967                        JoinImplementation::DeltaQuery(..) => {
968                            // For Delta joins, the first input is special, see
969                            // https://github.com/MaterializeInc/database-issues/issues/2115
970                            this.collect_index_reqs_inner(
971                                &mut inputs[0],
972                                &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)),
973                            )?;
974                            for input in &mut inputs[1..] {
975                                this.collect_index_reqs_inner(
976                                    input,
977                                    &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
978                                        DeltaJoinIndexUsageType::Lookup,
979                                    )),
980                                )?;
981                            }
982                        }
983                        JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
984                            for input in inputs {
985                                this.collect_index_reqs_inner(
986                                    input,
987                                    &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(*idx_id)),
988                                )?;
989                            }
990                        }
991                        JoinImplementation::Unimplemented => {
992                            soft_panic_or_log!(
993                                "CollectIndexRequests encountered an Unimplemented join"
994                            );
995                        }
996                    }
997                }
998                MirRelationExpr::ArrangeBy { input, keys } => {
999                    this.collect_index_reqs_inner(input, &IndexUsageContext::add_keys(contexts, keys))?;
1000                }
1001                MirRelationExpr::Get {
1002                    id: Id::Global(global_id),
1003                    access_strategy: persist_or_index,
1004                    ..
1005                } => {
1006                    this.index_reqs_by_id
1007                        .entry(*global_id)
1008                        .or_insert_with(Vec::new);
1009                    // If the context is empty, it means we didn't see an operator that would
1010                    // specifically want to use an index for this Get. However, let's still try to
1011                    // find an index for a full scan.
1012                    let mut try_full_scan = contexts.is_empty();
1013                    let mut index_accesses = Vec::new();
1014                    for context in contexts {
1015                        match &context.requested_keys {
1016                            None => {
1017                                // We have some index usage context, but didn't see an `ArrangeBy`.
1018                                try_full_scan = true;
1019                                match context.usage_type {
1020                                    IndexUsageType::FullScan | IndexUsageType::SinkExport | IndexUsageType::IndexExport => {
1021                                        // Not possible, because these don't go through
1022                                        // IndexUsageContext at all.
1023                                        unreachable!()
1024                                    },
1025                                    // You can find more info on why the following join cases
1026                                    // shouldn't happen in comments of the Join lowering to LIR.
1027                                    IndexUsageType::Lookup(_) => soft_panic_or_log!("CollectIndexRequests encountered an IndexedFilter join without an ArrangeBy"),
1028                                    IndexUsageType::DifferentialJoin => soft_panic_or_log!("CollectIndexRequests encountered a Differential join without an ArrangeBy"),
1029                                    IndexUsageType::DeltaJoin(_) => soft_panic_or_log!("CollectIndexRequests encountered a Delta join without an ArrangeBy"),
1030                                    IndexUsageType::PlanRootNoArrangement => {
1031                                        // This is ok: the entire plan is a `Get`, with not even an
1032                                        // `ArrangeBy`. Note that if an index exists, the usage will
1033                                        // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
1034                                        // because we are going into the `try_full_scan` if.
1035                                    },
1036                                    IndexUsageType::FastPathLimit => {
1037                                        // These are created much later, not even inside
1038                                        // `prune_and_annotate_dataflow_index_imports`.
1039                                        unreachable!()
1040                                    }
1041                                    IndexUsageType::DanglingArrangeBy => {
1042                                        // Not possible, because we create `DanglingArrangeBy`
1043                                        // only when we see an `ArrangeBy`.
1044                                        unreachable!()
1045                                    },
1046                                    IndexUsageType::Unknown => {
1047                                        // These are added only after `CollectIndexRequests` has run.
1048                                        unreachable!()
1049                                    }
1050                                }
1051                            }
1052                            Some(requested_keys) => {
1053                                for requested_key in requested_keys {
1054                                    match this
1055                                        .indexes_available
1056                                        .indexes_on(*global_id)
1057                                        .find(|(available_idx_id, available_key)| {
1058                                            match context.usage_type {
1059                                                IndexUsageType::Lookup(req_idx_id) => {
1060                                                    // `LiteralConstraints` already picked an index
1061                                                    // by id. Let's use that one.
1062                                                    assert!(!(available_idx_id == &req_idx_id && available_key != &requested_key));
1063                                                    available_idx_id == &req_idx_id
1064                                                },
1065                                                _ => available_key == &requested_key,
1066                                            }
1067                                        })
1068                                    {
1069                                        Some((idx_id, key)) => {
1070                                            this.index_reqs_by_id
1071                                                .get_mut(global_id)
1072                                                .unwrap()
1073                                                .push((idx_id, key.to_owned(), context.usage_type.clone()));
1074                                            index_accesses.push((idx_id, context.usage_type.clone()));
1075                                        },
1076                                        None => {
1077                                            // If there is a key requested for which we don't have an
1078                                            // index, then we might still be able to do a full scan of a
1079                                            // differently keyed index.
1080                                            try_full_scan = true;
1081                                        }
1082                                    }
1083                                }
1084                                if requested_keys.is_empty() {
1085                                    // It's a bit weird if an MIR ArrangeBy is not requesting any
1086                                    // key, but let's try a full scan in that case anyhow.
1087                                    try_full_scan = true;
1088                                }
1089                            }
1090                        }
1091                    }
1092                    if try_full_scan {
1093                        // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1094                        // this code can't distinguish between the case when there is 1 ArrangeBy at the
1095                        // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1096                        // we'll add only 1 full scan, which would be wrong in the latter case. However,
1097                        // the latter case can't currently happen until we do
1098                        // https://github.com/MaterializeInc/database-issues/issues/6363
1099                        // Also note that currently we are deduplicating index usage types when
1100                        // printing index usages in EXPLAIN.
1101                        if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1102                            this.index_reqs_by_id
1103                                .get_mut(global_id)
1104                                .unwrap()
1105                                .push((idx_id, key.to_owned(), IndexUsageType::FullScan));
1106                            index_accesses.push((idx_id, IndexUsageType::FullScan));
1107                        }
1108                    }
1109                    if index_accesses.is_empty() {
1110                        *persist_or_index = AccessStrategy::Persist;
1111                    } else {
1112                        *persist_or_index = AccessStrategy::Index(index_accesses);
1113                    }
1114                }
1115                MirRelationExpr::Get {
1116                    id: Id::Local(local_id),
1117                    ..
1118                } => {
1119                    // Add the current context to the vector of contexts of `local_id`.
1120                    // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1121                    // empty entry.)
1122                    this.context_across_lets
1123                        .get_mut(local_id)
1124                        .unwrap()
1125                        .extend(contexts.iter().cloned());
1126                    // No recursive call here, because Get has no inputs.
1127                }
1128                MirRelationExpr::Let { id, value, body } => {
1129                    let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1130                    // No shadowing in MIR
1131                    assert_none!(shadowed_context);
1132                    // We go backwards: Recurse on the body and then the value.
1133                    this.collect_index_reqs_inner(body, contexts)?;
1134                    // The above call filled in the entry for `id` in `context_across_lets` (if it
1135                    // was referenced). Anyhow, at least an empty entry should exist, because we started
1136                    // above with inserting it.
1137                    this.collect_index_reqs_inner(
1138                        value,
1139                        &this.context_across_lets[id].clone(),
1140                    )?;
1141                    // Clean up the id from the saved contexts.
1142                    this.context_across_lets.remove(id);
1143                }
1144                MirRelationExpr::LetRec {
1145                    ids,
1146                    values,
1147                    limits: _,
1148                    body,
1149                } => {
1150                    for id in ids.iter() {
1151                        let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1152                        assert_none!(shadowed_context); // No shadowing in MIR
1153                    }
1154                    // We go backwards: Recurse on the body first.
1155                    this.collect_index_reqs_inner(body, contexts)?;
1156                    // Reset the contexts of the ids (of the current LetRec), because an arrangement
1157                    // from a value can't be used in the body.
1158                    for id in ids.iter() {
1159                        *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1160                    }
1161                    // Recurse on the values in reverse order.
1162                    // Note that we do only one pass, i.e., we won't see context through a Get that
1163                    // refers to the previous iteration. But this is ok, because we can't reuse
1164                    // arrangements across iterations anyway.
1165                    for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
1166                        this.collect_index_reqs_inner(
1167                            value,
1168                            &this.context_across_lets[id].clone(),
1169                        )?;
1170                    }
1171                    // Clean up the ids from the saved contexts.
1172                    for id in ids {
1173                        this.context_across_lets.remove(id);
1174                    }
1175                }
1176                _ => {
1177                    // Nothing interesting at this node, recurse with the empty context (regardless of
1178                    // what context we got from above).
1179                    let empty_context = Vec::new();
1180                    for child in expr.children_mut() {
1181                        this.collect_index_reqs_inner(child, &empty_context)?;
1182                    }
1183                }
1184            })
1185        })
1186    }
1187}
1188
1189/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1190/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1191/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1192/// looking for is
1193/// ```text
1194/// <operation that uses an index>
1195///   ArrangeBy <requested_keys>
1196///     Get <global_id>
1197/// ```
1198/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1199/// pattern is present above the `Get`.
1200///
1201/// Note that we usually put this struct in a Vec, because we track context across local let
1202/// bindings, which means that a node can have multiple parents.
1203#[derive(Debug, Clone)]
1204struct IndexUsageContext {
1205    usage_type: IndexUsageType,
1206    requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1207}
1208
1209impl IndexUsageContext {
1210    pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1211        vec![IndexUsageContext {
1212            usage_type,
1213            requested_keys: None,
1214        }]
1215    }
1216
1217    // Add the keys of an ArrangeBy into the contexts.
1218    // Soft_panics if haven't already seen something that indicates what the index will be used for.
1219    pub fn add_keys(
1220        old_contexts: &Vec<IndexUsageContext>,
1221        keys_to_add: &Vec<Vec<MirScalarExpr>>,
1222    ) -> Vec<IndexUsageContext> {
1223        let old_contexts = if old_contexts.is_empty() {
1224            // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1225            soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1226            // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1227            // have a place to note down the requested keys below.
1228            IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1229        } else {
1230            old_contexts.clone()
1231        };
1232        old_contexts
1233            .into_iter()
1234            .flat_map(|old_context| {
1235                if !matches!(
1236                    old_context.usage_type,
1237                    IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1238                ) {
1239                    // If it's not an unknown delta join usage, then we simply note down the new
1240                    // keys into `requested_keys`.
1241                    let mut context = old_context.clone();
1242                    if context.requested_keys.is_none() {
1243                        context.requested_keys = Some(BTreeSet::new());
1244                    }
1245                    context
1246                        .requested_keys
1247                        .as_mut()
1248                        .unwrap()
1249                        .extend(keys_to_add.iter().cloned());
1250                    Some(context).into_iter().chain(None)
1251                } else {
1252                    // If it's an unknown delta join usage, then we need to figure out whether this
1253                    // is a full scan or a lookup.
1254                    //
1255                    // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1256                    // scan when starting the rendering of a delta path. This is the one for which
1257                    // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1258                    //
1259                    // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1260                    // the `source_key` based on the MIR plan. We do this by doing the same as
1261                    // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1262                    let source_key = keys_to_add
1263                        .iter()
1264                        .min()
1265                        .expect("ArrangeBy below a delta join has at least one key");
1266                    let full_scan_context = IndexUsageContext {
1267                        requested_keys: Some(BTreeSet::from([source_key.clone()])),
1268                        usage_type: IndexUsageType::DeltaJoin(
1269                            DeltaJoinIndexUsageType::FirstInputFullScan,
1270                        ),
1271                    };
1272                    let lookup_keys = keys_to_add
1273                        .into_iter()
1274                        .filter(|key| *key != source_key)
1275                        .cloned()
1276                        .collect_vec();
1277                    if lookup_keys.is_empty() {
1278                        Some(full_scan_context).into_iter().chain(None)
1279                    } else {
1280                        let lookup_context = IndexUsageContext {
1281                            requested_keys: Some(lookup_keys.into_iter().collect()),
1282                            usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1283                        };
1284                        Some(full_scan_context)
1285                            .into_iter()
1286                            .chain(Some(lookup_context))
1287                    }
1288                }
1289            })
1290            .collect()
1291    }
1292}
1293
1294/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1295/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1296#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1297pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1298    /// Notices that the optimizer wants to show to users.
1299    /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1300    pub optimizer_notices: Vec<Notice>,
1301    /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1302    /// `prune_and_annotate_dataflow_index_imports`.
1303    pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1304}
1305
1306impl<Notice> Default for DataflowMetainfo<Notice> {
1307    fn default() -> Self {
1308        DataflowMetainfo {
1309            optimizer_notices: Vec::new(),
1310            index_usage_types: BTreeMap::new(),
1311        }
1312    }
1313}
1314
1315impl<Notice> DataflowMetainfo<Notice> {
1316    /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1317    /// `index_ids` iterator against an entry expected to exist in the
1318    /// [`DataflowMetainfo::index_usage_types`].
1319    pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1320        UsedIndexes::new(
1321            df_desc
1322                .index_imports
1323                .iter()
1324                .map(|(id, _)| {
1325                    let entry = self.index_usage_types.get(id).cloned();
1326                    // If an entry does not exist, mark the usage type for this
1327                    // index as `Unknown`.
1328                    //
1329                    // This should never happen if this method is called after
1330                    // running `prune_and_annotate_dataflow_index_imports` on
1331                    // the dataflow (this happens at the end of the
1332                    // `optimize_dataflow` call).
1333                    let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1334
1335                    (*id, index_usage_type)
1336                })
1337                .collect(),
1338        )
1339    }
1340}
1341
1342impl DataflowMetainfo<RawOptimizerNotice> {
1343    /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1344    /// only if the exact same notice is not already present.
1345    pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1346    where
1347        T: Into<RawOptimizerNotice>,
1348    {
1349        let notice = notice.into();
1350        if !self.optimizer_notices.contains(&notice) {
1351            self.optimizer_notices.push(notice);
1352        }
1353    }
1354}