Skip to main content

mz_transform/
dataflow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22    AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29#[cfg(any(test, feature = "proptest"))]
30use proptest_derive::Arbitrary;
31use serde::{Deserialize, Serialize};
32
33use crate::monotonic::MonotonicFlag;
34use crate::notice::RawOptimizerNotice;
35use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
36
37/// Optimizes the implementation of each dataflow.
38///
39/// Inlines views, performs a full optimization pass including physical
40/// planning using the supplied indexes, propagates filtering and projection
41/// information to dataflow sources and lifts monotonicity information.
42#[mz_ore::instrument(
43    target = "optimizer",
44    level = "debug",
45    fields(path.segment ="global")
46)]
47pub fn optimize_dataflow(
48    dataflow: &mut DataflowDesc,
49    transform_ctx: &mut TransformCtx,
50    fast_path_optimizer: bool,
51) -> Result<(), TransformError> {
52    // Inline views that are used in only one other view.
53    inline_views(dataflow)?;
54
55    if fast_path_optimizer {
56        optimize_dataflow_relations(
57            dataflow,
58            &Optimizer::fast_path_optimizer(transform_ctx),
59            transform_ctx,
60        )?;
61    } else {
62        // Logical optimization pass after view inlining
63        optimize_dataflow_relations(
64            dataflow,
65            #[allow(deprecated)]
66            &Optimizer::logical_optimizer(transform_ctx),
67            transform_ctx,
68        )?;
69
70        optimize_dataflow_filters(dataflow)?;
71        // TODO: when the linear operator contract ensures that propagated
72        // predicates are always applied, projections and filters can be removed
73        // from where they come from. Once projections and filters can be removed,
74        // TODO: it would be useful for demand to be optimized after filters
75        // that way demand only includes the columns that are still necessary after
76        // the filters are applied.
77        optimize_dataflow_demand(dataflow)?;
78
79        // A smaller logical optimization pass after projections and filters are
80        // pushed down across views.
81        optimize_dataflow_relations(
82            dataflow,
83            &Optimizer::logical_cleanup_pass(transform_ctx, false),
84            transform_ctx,
85        )?;
86
87        // Physical optimization pass
88        optimize_dataflow_relations(
89            dataflow,
90            &Optimizer::physical_optimizer(transform_ctx),
91            transform_ctx,
92        )?;
93
94        optimize_dataflow_monotonic(dataflow, transform_ctx)?;
95    }
96
97    prune_and_annotate_dataflow_index_imports(
98        dataflow,
99        transform_ctx.indexes,
100        transform_ctx.df_meta,
101    )?;
102
103    // Warning: If you want to add a transform call here, consider it very carefully whether it
104    // could accidentally invalidate information that we already derived above in
105    // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
106
107    mz_repr::explain::trace_plan(dataflow);
108
109    Ok(())
110}
111
112/// Inline views used in one other view, and in no exported objects.
113#[mz_ore::instrument(
114    target = "optimizer",
115    level = "debug",
116    fields(path.segment = "inline_views")
117)]
118fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
119    // We cannot inline anything whose `BuildDesc::id` appears in either the
120    // `index_exports` or `sink_exports` of `dataflow`, because we lose our
121    // ability to name it.
122
123    // A view can / should be in-lined in another view if it is only used by
124    // one subsequent view. If there are two distinct views that have not
125    // themselves been merged, then too bad and it doesn't get inlined.
126
127    // Starting from the *last* object to build, walk backwards and inline
128    // any view that is neither referenced by a `index_exports` nor
129    // `sink_exports` nor more than two remaining objects to build.
130
131    for index in (0..dataflow.objects_to_build.len()).rev() {
132        // Capture the name used by others to reference this view.
133        let global_id = dataflow.objects_to_build[index].id;
134        // Determine if any exports directly reference this view.
135        let mut occurs_in_export = false;
136        for (_gid, sink_desc) in dataflow.sink_exports.iter() {
137            if sink_desc.from == global_id {
138                occurs_in_export = true;
139            }
140        }
141        for (_, (index_desc, _)) in dataflow.index_exports.iter() {
142            if index_desc.on_id == global_id {
143                occurs_in_export = true;
144            }
145        }
146        // Count the number of subsequent views that reference this view.
147        let mut occurrences_in_later_views = Vec::new();
148        for other in (index + 1)..dataflow.objects_to_build.len() {
149            if dataflow.objects_to_build[other]
150                .plan
151                .depends_on()
152                .contains(&global_id)
153            {
154                occurrences_in_later_views.push(other);
155            }
156        }
157        // Inline if the view is referenced in one view and no exports.
158        if !occurs_in_export && occurrences_in_later_views.len() == 1 {
159            let other = occurrences_in_later_views[0];
160            // We can remove this view and insert it in the later view,
161            // but are not able to relocate the later view `other`.
162
163            // When splicing in the `index` view, we need to create disjoint
164            // identifiers for the Let's `body` and `value`, as well as a new
165            // identifier for the binding itself. Following `NormalizeLets`, we
166            // go with the binding first, then the value, then the body.
167            let mut id_gen = crate::IdGen::default();
168            let new_local = LocalId::new(id_gen.allocate_id());
169            // Use the same `id_gen` to assign new identifiers to `index`.
170            crate::normalize_lets::renumber_bindings(
171                dataflow.objects_to_build[index].plan.as_inner_mut(),
172                &mut id_gen,
173            )?;
174            // Assign new identifiers to the other relation.
175            crate::normalize_lets::renumber_bindings(
176                dataflow.objects_to_build[other].plan.as_inner_mut(),
177                &mut id_gen,
178            )?;
179            // Install the `new_local` name wherever `global_id` was used.
180            dataflow.objects_to_build[other]
181                .plan
182                .as_inner_mut()
183                .visit_pre_mut(|expr| {
184                    if let MirRelationExpr::Get { id, .. } = expr {
185                        if id == &Id::Global(global_id) {
186                            *id = Id::Local(new_local);
187                        }
188                    }
189                });
190
191            // With identifiers rewritten, we can replace `other` with
192            // a `MirRelationExpr::Let` binding, whose value is `index` and
193            // whose body is `other`.
194            let body = dataflow.objects_to_build[other]
195                .plan
196                .as_inner_mut()
197                .take_dangerous();
198            let value = dataflow.objects_to_build[index]
199                .plan
200                .as_inner_mut()
201                .take_dangerous();
202            *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
203                id: new_local,
204                value: Box::new(value),
205                body: Box::new(body),
206            };
207            dataflow.objects_to_build.remove(index);
208        }
209    }
210
211    mz_repr::explain::trace_plan(dataflow);
212
213    Ok(())
214}
215
216/// Performs either the logical or the physical optimization pass on the
217/// dataflow using the supplied set of indexes.
218#[mz_ore::instrument(
219    target = "optimizer",
220    level = "debug",
221    fields(path.segment = optimizer.name)
222)]
223fn optimize_dataflow_relations(
224    dataflow: &mut DataflowDesc,
225    optimizer: &Optimizer,
226    ctx: &mut TransformCtx,
227) -> Result<(), TransformError> {
228    // Re-optimize each dataflow
229    for object in dataflow.objects_to_build.iter_mut() {
230        // Re-run all optimizations on the composite views.
231        ctx.set_global_id(object.id);
232        optimizer.transform(object.plan.as_inner_mut(), ctx)?;
233        ctx.reset_global_id();
234    }
235
236    mz_repr::explain::trace_plan(dataflow);
237
238    Ok(())
239}
240
241/// Pushes demand information from published outputs to dataflow inputs,
242/// projecting away unnecessary columns.
243///
244/// Dataflows that exist for the sake of generating plan explanations do not
245/// have published outputs. In this case, we push demand information from views
246/// not depended on by other views to dataflow inputs.
247#[mz_ore::instrument(
248    target = "optimizer",
249    level = "debug",
250    fields(path.segment ="demand")
251)]
252fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
253    // Maps id -> union of known columns demanded from the source/view with the
254    // corresponding id.
255    let mut demand = BTreeMap::new();
256
257    if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
258        // In the absence of any exports, just demand all columns from views
259        // that are not depended on by another view, which is currently the last
260        // object in `objects_to_build`.
261
262        // A DataflowDesc without exports is currently created in the context of
263        // EXPLAIN outputs. This ensures that the output has all the columns of
264        // the original explainee.
265        if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
266            demand
267                .entry(Id::Global(build_desc.id))
268                .or_insert_with(BTreeSet::new)
269                .extend(0..build_desc.plan.as_inner_mut().arity());
270        }
271    } else {
272        // Demand all columns of inputs to sinks.
273        for (_id, sink) in dataflow.sink_exports.iter() {
274            let input_id = sink.from;
275            demand
276                .entry(Id::Global(input_id))
277                .or_insert_with(BTreeSet::new)
278                .extend(0..dataflow.arity_of(&input_id));
279        }
280
281        // Demand all columns of inputs to exported indexes.
282        for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
283            let input_id = desc.on_id;
284            demand
285                .entry(Id::Global(input_id))
286                .or_insert_with(BTreeSet::new)
287                .extend(0..dataflow.arity_of(&input_id));
288        }
289    }
290
291    optimize_dataflow_demand_inner(
292        dataflow
293            .objects_to_build
294            .iter_mut()
295            .rev()
296            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
297        &mut demand,
298    )?;
299
300    mz_repr::explain::trace_plan(dataflow);
301
302    Ok(())
303}
304
305/// Pushes demand through views in `view_sequence` in order, removing
306/// columns not demanded.
307///
308/// This method is made public for the sake of testing.
309/// TODO: make this private once we allow multiple exports per dataflow.
310pub fn optimize_dataflow_demand_inner<'a, I>(
311    view_sequence: I,
312    demand: &mut BTreeMap<Id, BTreeSet<usize>>,
313) -> Result<(), TransformError>
314where
315    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
316{
317    // Maps id -> The projection that was pushed down on the view with the
318    // corresponding id.
319    let mut applied_projection = BTreeMap::new();
320    // Collect the mutable references to views after pushing projection down
321    // in order to run cleanup actions on them in a second loop.
322    let mut view_refs = Vec::new();
323    let projection_pushdown = crate::movement::ProjectionPushdown::default();
324    for (id, view) in view_sequence {
325        if let Some(columns) = demand.get(&id) {
326            let projection_pushed_down = columns.iter().map(|c| *c).collect();
327            // Push down the projection consisting of the entries of `columns`
328            // in increasing order.
329            projection_pushdown.action(view, &projection_pushed_down, demand)?;
330            let new_type = view.typ();
331            applied_projection.insert(id, (projection_pushed_down, new_type));
332        }
333        view_refs.push(view);
334    }
335
336    for view in view_refs {
337        // Update `Get` nodes to reflect any columns that have been projected away.
338        projection_pushdown.update_projection_around_get(view, &applied_projection);
339    }
340
341    Ok(())
342}
343
344/// Pushes predicate to dataflow inputs.
345#[mz_ore::instrument(
346    target = "optimizer",
347    level = "debug",
348    fields(path.segment ="filters")
349)]
350fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
351    // Contains id -> predicates map, describing those predicates that
352    // can (but need not) be applied to the collection named by `id`.
353    let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
354
355    // Propagate predicate information from outputs to inputs.
356    optimize_dataflow_filters_inner(
357        dataflow
358            .objects_to_build
359            .iter_mut()
360            .rev()
361            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
362        &mut predicates,
363    )?;
364
365    // Push predicate information into the SourceDesc.
366    for (source_id, source_import) in dataflow.source_imports.iter_mut() {
367        let source = &mut source_import.desc;
368        if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
369            if !list.is_empty() {
370                // Canonicalize the order of predicates, for stable plans.
371                let mut list = list.into_iter().collect::<Vec<_>>();
372                list.sort();
373                // Install no-op predicate information if none exists.
374                if source.arguments.operators.is_none() {
375                    source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
376                }
377                // Add any predicates that can be pushed to the source.
378                if let Some(operator) = source.arguments.operators.take() {
379                    source.arguments.operators = Some(operator.filter(list));
380                    source.arguments.operators.as_mut().map(|x| x.optimize());
381                }
382            }
383        }
384    }
385
386    mz_repr::explain::trace_plan(dataflow);
387
388    Ok(())
389}
390
391/// Pushes filters down through views in `view_sequence` in order.
392///
393/// This method is made public for the sake of testing.
394/// TODO: make this private once we allow multiple exports per dataflow.
395pub fn optimize_dataflow_filters_inner<'a, I>(
396    view_iter: I,
397    predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
398) -> Result<(), TransformError>
399where
400    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
401{
402    let transform = crate::predicate_pushdown::PredicatePushdown::default();
403    for (id, view) in view_iter {
404        if let Some(list) = predicates.get(&id).clone() {
405            if !list.is_empty() {
406                *view = view.take_dangerous().filter(list.iter().cloned());
407            }
408        }
409        transform.action(view, predicates)?;
410    }
411    Ok(())
412}
413
414/// Propagates information about monotonic inputs through operators,
415/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
416#[mz_ore::instrument(
417    target = "optimizer",
418    level = "debug",
419    fields(path.segment ="monotonic")
420)]
421pub fn optimize_dataflow_monotonic(
422    dataflow: &mut DataflowDesc,
423    ctx: &mut TransformCtx,
424) -> Result<(), TransformError> {
425    let mut monotonic_ids = BTreeSet::new();
426    for (source_id, source_import) in dataflow.source_imports.iter() {
427        if source_import.monotonic {
428            monotonic_ids.insert(source_id.clone());
429        }
430    }
431    for (
432        _index_id,
433        IndexImport {
434            desc: index_desc,
435            monotonic,
436            ..
437        },
438    ) in dataflow.index_imports.iter()
439    {
440        if *monotonic {
441            monotonic_ids.insert(index_desc.on_id.clone());
442        }
443    }
444
445    let monotonic_flag = MonotonicFlag::default();
446
447    for build_desc in dataflow.objects_to_build.iter_mut() {
448        monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
449    }
450
451    mz_repr::explain::trace_plan(dataflow);
452
453    Ok(())
454}
455
456/// Determine whether we require snapshots from our durable source imports.
457/// (For example, these can often be skipped for simple subscribe queries.)
458pub fn optimize_dataflow_snapshot(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
459    // For every global id, true iff we need a snapshot for that global ID.
460    // This is computed bottom-up: subscribes may or may not require a snapshot from their inputs,
461    // index exports definitely do, and objects-to-build require a snapshot from their inputs if
462    // either they need to provide a snapshot as output or they may need snapshots internally, eg. to
463    // compute a join.
464    let mut downstream_requires_snapshot = BTreeMap::new();
465
466    for (_id, export) in &dataflow.sink_exports {
467        *downstream_requires_snapshot
468            .entry(Id::Global(export.from))
469            .or_default() |= export.with_snapshot;
470    }
471    for (_id, (export, _typ)) in &dataflow.index_exports {
472        *downstream_requires_snapshot
473            .entry(Id::Global(export.on_id))
474            .or_default() |= true;
475    }
476    for BuildDesc { id: _, plan } in dataflow.objects_to_build.iter().rev() {
477        // For now, we treat all intermediate nodes as potentially requiring a snapshot.
478        // Walk the AST, marking anything depended on by a compute object as snapshot-required.
479        let mut todo = vec![(true, &plan.0)];
480        while let Some((requires_snapshot, expr)) = todo.pop() {
481            match expr {
482                MirRelationExpr::Get { id, .. } => {
483                    *downstream_requires_snapshot.entry(*id).or_default() |= requires_snapshot;
484                }
485                other => {
486                    todo.extend(other.children().rev().map(|c| (true, c)));
487                }
488            }
489        }
490    }
491    for (id, import) in &mut dataflow.source_imports {
492        let with_snapshot = downstream_requires_snapshot
493            .entry(Id::Global(*id))
494            .or_default();
495
496        // As above, fetch the snapshot if there are any transformations on the raw source data.
497        // (And we'll always need to check for things like temporal filters, since those allow
498        // snapshot data to affect diffs at times past the as-of.)
499        *with_snapshot |= import.desc.arguments.operators.is_some();
500
501        import.with_snapshot = *with_snapshot;
502    }
503    for (_id, import) in &mut dataflow.index_imports {
504        let with_snapshot = downstream_requires_snapshot
505            .entry(Id::Global(import.desc.on_id))
506            .or_default();
507
508        import.with_snapshot = *with_snapshot;
509    }
510
511    Ok(())
512}
513
514/// Restricts the indexes imported by `dataflow` to only the ones it needs.
515/// It also adds to the `DataflowMetainfo` how each index will be used.
516/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
517/// their index usage types.
518///
519/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
520/// references.
521///
522/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
523/// `ArrangeBy`s at the top of unused Let bindings.
524#[mz_ore::instrument(
525    target = "optimizer",
526    level = "debug",
527    fields(path.segment = "index_imports")
528)]
529fn prune_and_annotate_dataflow_index_imports(
530    dataflow: &mut DataflowDesc,
531    indexes: &dyn IndexOracle,
532    dataflow_metainfo: &mut DataflowMetainfo,
533) -> Result<(), TransformError> {
534    // Preparation.
535    // Let's save the unique keys of the sources. This will inform which indexes to choose for full
536    // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
537    // sources that are not getting an indexed read.)
538    let mut source_keys = BTreeMap::new();
539    for build_desc in dataflow.objects_to_build.iter() {
540        build_desc
541            .plan
542            .as_inner()
543            .visit_pre(|expr: &MirRelationExpr| match expr {
544                MirRelationExpr::Get {
545                    id: Id::Global(global_id),
546                    typ,
547                    ..
548                } => {
549                    source_keys.entry(*global_id).or_insert_with(|| {
550                        typ.keys
551                            .iter()
552                            .map(|key| {
553                                key.iter()
554                                    // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
555                                    // later we can more easily compare index keys to these keys.
556                                    .map(|col_idx| MirScalarExpr::column(*col_idx))
557                                    .collect()
558                            })
559                            .collect()
560                    });
561                }
562                _ => {}
563            });
564    }
565
566    // This will be a mapping of
567    // (ids used by exports and objects to build) ->
568    // (arrangement keys and usage types on that id that have been requested)
569    let mut index_reqs_by_id = BTreeMap::new();
570
571    // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
572    // for which we also have an available index.
573    for build_desc in dataflow.objects_to_build.iter_mut() {
574        CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
575            .collect_index_reqs(build_desc.plan.as_inner_mut())?;
576    }
577
578    // Collect index usages by `sink_exports`.
579    // A sink export sometimes wants to directly use an imported index. I know of one case where
580    // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
581    // `objects_to_build`, but will want to directly read from the index and write to a sink.
582    for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
583        // First, let's see if there exists an index on the id that the sink wants. If not, there is
584        // nothing we can do here.
585        if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
586            // If yes, then we'll add a request of _some_ index: If we already collected an index
587            // request on this id, then use that, otherwise use the above arbitrarily picked index.
588            let requested_idxs = index_reqs_by_id
589                .entry(sink_desc.from)
590                .or_insert_with(Vec::new);
591            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
592                requested_idxs.push((
593                    *already_req_idx_id,
594                    already_req_key.clone(),
595                    IndexUsageType::SinkExport,
596                ));
597            } else {
598                requested_idxs.push((
599                    idx_id,
600                    arbitrary_idx_key.to_owned(),
601                    IndexUsageType::SinkExport,
602                ));
603            }
604        }
605    }
606
607    // Collect index usages by `index_exports`.
608    for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
609        // First, let's see if there exists an index on the id that the exported index is on. If
610        // not, there is nothing we can do here.
611        if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
612            // If yes, then we'll add an index request of some index: If we already collected an
613            // index request on this id, then use that, otherwise use the above arbitrarily picked
614            // index.
615            let requested_idxs = index_reqs_by_id
616                .entry(index_desc.on_id)
617                .or_insert_with(Vec::new);
618            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
619                requested_idxs.push((
620                    *already_req_idx_id,
621                    already_req_key.clone(),
622                    IndexUsageType::IndexExport,
623                ));
624            } else {
625                // This is surprising: Actually, an index creation dataflow always has a plan in
626                // `objects_to_build` that will have a Get of the object that the index is on (see
627                // `DataflowDescription::export_index`). Therefore, we should have already requested
628                // an index usage when seeing that Get in `CollectIndexRequests`.
629                soft_panic_or_log!(
630                    "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
631                );
632                requested_idxs.push((
633                    idx_id,
634                    arbitrary_index_key.to_owned(),
635                    IndexUsageType::IndexExport,
636                ));
637            }
638        }
639    }
640
641    // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
642    // Moreover, for each of these ids, if any index exists on it, then we should have already
643    // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
644    // Unknown usage, picking an arbitrary index.
645    for (id, index_reqs) in index_reqs_by_id.iter_mut() {
646        if index_reqs.is_empty() {
647            // Try to pick an arbitrary index to be fully scanned.
648            if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
649                soft_panic_or_log!(
650                    "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
651id: {}, key: {:?}",
652                    id,
653                    key
654                );
655                index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
656            }
657        }
658    }
659
660    // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
661    // request on the same id.
662    // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
663    // that if a Get has that id, then any full scan index accesses on it should be changed to use
664    // the indicated index id.
665    let mut full_scan_changes = BTreeMap::new();
666    for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
667        // Let's choose a non-FullScan access (if exists).
668        if let Some((picked_idx, picked_idx_key)) = choose_index(
669            &source_keys,
670            get_id,
671            &index_reqs
672                .iter()
673                .filter_map(|(idx_id, key, usage_type)| match usage_type {
674                    IndexUsageType::FullScan => None,
675                    _ => Some((*idx_id, key.clone())),
676                })
677                .collect_vec(),
678        ) {
679            // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
680            for (idx_id, key, usage_type) in index_reqs {
681                match usage_type {
682                    IndexUsageType::FullScan => {
683                        full_scan_changes.insert(get_id, picked_idx);
684                        *idx_id = picked_idx;
685                        key.clone_from(&picked_idx_key);
686                    }
687                    _ => {}
688                }
689            }
690        }
691    }
692    // Apply the above full scan changes to also the Gets.
693    for build_desc in dataflow.objects_to_build.iter_mut() {
694        build_desc
695            .plan
696            .as_inner_mut()
697            .visit_pre_mut(|expr: &mut MirRelationExpr| {
698                match expr {
699                    MirRelationExpr::Get {
700                        id: Id::Global(global_id),
701                        typ: _,
702                        access_strategy: persist_or_index,
703                    } => {
704                        if let Some(new_idx_id) = full_scan_changes.get(global_id) {
705                            match persist_or_index {
706                                AccessStrategy::UnknownOrLocal => {
707                                    // Should have been already filled by `collect_index_reqs`.
708                                    unreachable!()
709                                }
710                                AccessStrategy::Persist => {
711                                    // We already know that it's an indexed access.
712                                    unreachable!()
713                                }
714                                AccessStrategy::SameDataflow => {
715                                    // We have not added such annotations yet.
716                                    unreachable!()
717                                }
718                                AccessStrategy::Index(accesses) => {
719                                    for (idx_id, usage_type) in accesses {
720                                        if matches!(usage_type, IndexUsageType::FullScan) {
721                                            *idx_id = *new_idx_id;
722                                        }
723                                    }
724                                }
725                            }
726                        }
727                    }
728                    _ => {}
729                }
730            });
731    }
732
733    // Annotate index imports by their usage types
734    dataflow_metainfo.index_usage_types = BTreeMap::new();
735    for (
736        index_id,
737        IndexImport {
738            desc: index_desc,
739            typ: _,
740            monotonic: _,
741            with_snapshot: _,
742        },
743    ) in dataflow.index_imports.iter_mut()
744    {
745        // A sanity check that we are not importing an index that we are also exporting.
746        assert!(
747            !dataflow
748                .index_exports
749                .iter()
750                .map(|(exported_index_id, _)| exported_index_id)
751                .any(|exported_index_id| exported_index_id == index_id)
752        );
753
754        let mut new_usage_types = Vec::new();
755        // Let's see whether something has requested an index on this object that this imported
756        // index is on.
757        if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
758            for (req_idx_id, req_key, req_usage_type) in index_reqs {
759                if req_idx_id == index_id {
760                    soft_assert_eq_or_log!(*req_key, index_desc.key);
761                    new_usage_types.push(req_usage_type.clone());
762                }
763            }
764        }
765        if !new_usage_types.is_empty() {
766            dataflow_metainfo
767                .index_usage_types
768                .insert(*index_id, new_usage_types);
769        }
770    }
771
772    // Prune index imports to only those that are used
773    dataflow
774        .index_imports
775        .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
776
777    // Determine AccessStrategy::SameDataflow accesses. These were classified as
778    // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
779    // a collection that we are building ourselves, then we adjust the access strategy.
780    let mut objects_to_build_ids = BTreeSet::new();
781    for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
782        objects_to_build_ids.insert(id.clone());
783    }
784    for build_desc in dataflow.objects_to_build.iter_mut() {
785        build_desc
786            .plan
787            .as_inner_mut()
788            .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
789                MirRelationExpr::Get {
790                    id: Id::Global(global_id),
791                    typ: _,
792                    access_strategy,
793                } => match access_strategy {
794                    AccessStrategy::Persist => {
795                        if objects_to_build_ids.contains(global_id) {
796                            *access_strategy = AccessStrategy::SameDataflow;
797                        }
798                    }
799                    _ => {}
800                },
801                _ => {}
802            });
803    }
804
805    // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
806    for build_desc in dataflow.objects_to_build.iter() {
807        build_desc
808            .plan
809            .as_inner()
810            .visit_pre(|expr: &MirRelationExpr| match expr {
811                MirRelationExpr::Get {
812                    id: Id::Global(_),
813                    typ: _,
814                    access_strategy: AccessStrategy::Index(accesses),
815                } => {
816                    for (idx_id, _) in accesses {
817                        soft_assert_or_log!(
818                            dataflow.index_imports.contains_key(idx_id),
819                            "Dangling Get index annotation"
820                        );
821                    }
822                }
823                _ => {}
824            });
825    }
826
827    mz_repr::explain::trace_plan(dataflow);
828
829    Ok(())
830}
831
832/// Pick an index from a given Vec of index keys.
833///
834/// Currently, we pick as follows:
835///  - If there is an index on a unique key, then we pick that. (It might be better distributed, and
836///    is less likely to get dropped than other indexes.)
837///  - Otherwise, we pick an arbitrary index.
838///
839/// TODO: There are various edge cases where a better choice would be possible:
840/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
841///   capture this already.)
842/// - Some indexes might have an error, while others don't.
843///   <https://github.com/MaterializeInc/database-issues/issues/4455>
844/// - Some indexes might have more extra data in their keys (because of being on more complicated
845///   expressions than just column references), which won't be used in a full scan.
846fn choose_index(
847    source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
848    id: &GlobalId,
849    indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
850) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
851    match source_keys.get(id) {
852        None => indexes.iter().next().cloned(), // pick an arbitrary index
853        Some(coll_keys) => match indexes
854            .iter()
855            .find(|(_idx_id, key)| coll_keys.contains(&*key))
856        {
857            Some((idx_id, key)) => Some((*idx_id, key.clone())),
858            None => indexes.iter().next().cloned(), // pick an arbitrary index
859        },
860    }
861}
862
863#[derive(Debug)]
864struct CollectIndexRequests<'a> {
865    /// We were told about these unique keys on sources.
866    source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
867    /// We were told about these indexes being available.
868    indexes_available: &'a dyn IndexOracle,
869    /// We'll be collecting index requests here.
870    index_reqs_by_id:
871        &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
872    /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
873    /// current node (see docs on `IndexUsageContext` about what context we keep).
874    /// Moreover, we need to propagate this context from cte uses to cte definitions.
875    /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
876    /// added together.
877    context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
878    recursion_guard: RecursionGuard,
879}
880
881impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
882    fn recursion_guard(&self) -> &RecursionGuard {
883        &self.recursion_guard
884    }
885}
886
887impl<'a> CollectIndexRequests<'a> {
888    fn new(
889        source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
890        indexes_available: &'a dyn IndexOracle,
891        index_reqs_by_id: &'a mut BTreeMap<
892            GlobalId,
893            Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
894        >,
895    ) -> CollectIndexRequests<'a> {
896        CollectIndexRequests {
897            source_keys,
898            indexes_available,
899            index_reqs_by_id,
900            context_across_lets: BTreeMap::new(),
901            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
902        }
903    }
904
905    pub fn collect_index_reqs(
906        &mut self,
907        expr: &mut MirRelationExpr,
908    ) -> Result<(), RecursionLimitError> {
909        assert!(self.context_across_lets.is_empty());
910        self.collect_index_reqs_inner(
911            expr,
912            &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
913        )?;
914        assert!(self.context_across_lets.is_empty());
915        // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
916        for (_id, index_reqs) in self.index_reqs_by_id.iter() {
917            for (_, _, index_usage_type) in index_reqs {
918                soft_assert_or_log!(
919                    !matches!(
920                        index_usage_type,
921                        IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
922                    ),
923                    "Delta join Unknown index usage remained"
924                );
925            }
926        }
927        Ok(())
928    }
929
930    fn collect_index_reqs_inner(
931        &mut self,
932        expr: &mut MirRelationExpr,
933        contexts: &Vec<IndexUsageContext>,
934    ) -> Result<(), RecursionLimitError> {
935        self.checked_recur_mut(|this| {
936            // If an index exists on `on_id`, this function picks an index to be fully scanned.
937            let pick_index_for_full_scan = |on_id: &GlobalId| {
938                // Note that the choice we make here might be modified later at the
939                // "Adjust FullScans to not introduce a new index dependency".
940                choose_index(
941                    this.source_keys,
942                    on_id,
943                    &this
944                        .indexes_available
945                        .indexes_on(*on_id)
946                        .map(|(idx_id, key)| (idx_id, key.iter().cloned().collect_vec()))
947                        .collect_vec(),
948                )
949            };
950
951            // See comment on `IndexUsageContext`.
952            Ok(match expr {
953                MirRelationExpr::Join {
954                    inputs,
955                    implementation,
956                    ..
957                } => {
958                    match implementation {
959                        JoinImplementation::Differential(..) => {
960                            for input in inputs {
961                                this.collect_index_reqs_inner(
962                                    input,
963                                    &IndexUsageContext::from_usage_type(
964                                        IndexUsageType::DifferentialJoin,
965                                    ),
966                                )?;
967                            }
968                        }
969                        JoinImplementation::DeltaQuery(..) => {
970                            // For Delta joins, the first input is special, see
971                            // https://github.com/MaterializeInc/database-issues/issues/2115
972                            this.collect_index_reqs_inner(
973                                &mut inputs[0],
974                                &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
975                                    DeltaJoinIndexUsageType::Unknown,
976                                )),
977                            )?;
978                            for input in &mut inputs[1..] {
979                                this.collect_index_reqs_inner(
980                                    input,
981                                    &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
982                                        DeltaJoinIndexUsageType::Lookup,
983                                    )),
984                                )?;
985                            }
986                        }
987                        JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
988                            for input in inputs {
989                                this.collect_index_reqs_inner(
990                                    input,
991                                    &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(
992                                        *idx_id,
993                                    )),
994                                )?;
995                            }
996                        }
997                        JoinImplementation::Unimplemented => {
998                            soft_panic_or_log!(
999                                "CollectIndexRequests encountered an Unimplemented join"
1000                            );
1001                        }
1002                    }
1003                }
1004                MirRelationExpr::ArrangeBy { input, keys } => {
1005                    let ctx = &IndexUsageContext::add_keys(contexts, keys);
1006                    this.collect_index_reqs_inner(input, ctx)?;
1007                }
1008                MirRelationExpr::Get {
1009                    id: Id::Global(global_id),
1010                    access_strategy: persist_or_index,
1011                    ..
1012                } => {
1013                    this.index_reqs_by_id
1014                        .entry(*global_id)
1015                        .or_insert_with(Vec::new);
1016                    // If the context is empty, it means we didn't see an operator that would
1017                    // specifically want to use an index for this Get. However, let's still try to
1018                    // find an index for a full scan.
1019                    let mut try_full_scan = contexts.is_empty();
1020                    let mut index_accesses = Vec::new();
1021                    for context in contexts {
1022                        match &context.requested_keys {
1023                            None => {
1024                                // We have some index usage context, but didn't see an `ArrangeBy`.
1025                                try_full_scan = true;
1026                                match context.usage_type {
1027                                    IndexUsageType::FullScan
1028                                    | IndexUsageType::SinkExport
1029                                    | IndexUsageType::IndexExport => {
1030                                        // Not possible, because these don't go through
1031                                        // IndexUsageContext at all.
1032                                        unreachable!()
1033                                    }
1034                                    // You can find more info on why the following join cases
1035                                    // shouldn't happen in comments of the Join lowering to LIR.
1036                                    IndexUsageType::Lookup(_) => soft_panic_or_log!(
1037                                        "CollectIndexRequests encountered \
1038                                         an IndexedFilter join without an ArrangeBy"
1039                                    ),
1040                                    IndexUsageType::DifferentialJoin => soft_panic_or_log!(
1041                                        "CollectIndexRequests encountered \
1042                                         a Differential join without an ArrangeBy"
1043                                    ),
1044                                    IndexUsageType::DeltaJoin(_) => soft_panic_or_log!(
1045                                        "CollectIndexRequests encountered \
1046                                         a Delta join without an ArrangeBy"
1047                                    ),
1048                                    IndexUsageType::PlanRootNoArrangement => {
1049                                        // This is ok: the entire plan is a `Get`, with not even an
1050                                        // `ArrangeBy`. Note that if an index exists, the usage will
1051                                        // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
1052                                        // because we are going into the `try_full_scan` if.
1053                                    }
1054                                    IndexUsageType::FastPathLimit => {
1055                                        // These are created much later, not even inside
1056                                        // `prune_and_annotate_dataflow_index_imports`.
1057                                        unreachable!()
1058                                    }
1059                                    IndexUsageType::DanglingArrangeBy => {
1060                                        // Not possible, because we create `DanglingArrangeBy`
1061                                        // only when we see an `ArrangeBy`.
1062                                        unreachable!()
1063                                    }
1064                                    IndexUsageType::Unknown => {
1065                                        // These are added only after `CollectIndexRequests` has run.
1066                                        unreachable!()
1067                                    }
1068                                }
1069                            }
1070                            Some(requested_keys) => {
1071                                for requested_key in requested_keys {
1072                                    match this.indexes_available.indexes_on(*global_id).find(
1073                                        |(available_idx_id, available_key)| {
1074                                            match context.usage_type {
1075                                                IndexUsageType::Lookup(req_idx_id) => {
1076                                                    // `LiteralConstraints` already picked an index
1077                                                    // by id. Let's use that one.
1078                                                    assert!(
1079                                                        !(available_idx_id == &req_idx_id
1080                                                            && available_key != &requested_key)
1081                                                    );
1082                                                    available_idx_id == &req_idx_id
1083                                                }
1084                                                _ => available_key == &requested_key,
1085                                            }
1086                                        },
1087                                    ) {
1088                                        Some((idx_id, key)) => {
1089                                            let usage = context.usage_type.clone();
1090                                            this.index_reqs_by_id
1091                                                .get_mut(global_id)
1092                                                .unwrap()
1093                                                .push((idx_id, key.to_owned(), usage.clone()));
1094                                            index_accesses.push((idx_id, usage));
1095                                        }
1096                                        None => {
1097                                            // If there is a key requested for which we don't have an
1098                                            // index, then we might still be able to do a full scan of a
1099                                            // differently keyed index.
1100                                            try_full_scan = true;
1101                                        }
1102                                    }
1103                                }
1104                                if requested_keys.is_empty() {
1105                                    // It's a bit weird if an MIR ArrangeBy is not requesting any
1106                                    // key, but let's try a full scan in that case anyhow.
1107                                    try_full_scan = true;
1108                                }
1109                            }
1110                        }
1111                    }
1112                    if try_full_scan {
1113                        // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1114                        // this code can't distinguish between the case when there is 1 ArrangeBy at the
1115                        // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1116                        // we'll add only 1 full scan, which would be wrong in the latter case. However,
1117                        // the latter case can't currently happen until we do
1118                        // https://github.com/MaterializeInc/database-issues/issues/6363
1119                        // Also note that currently we are deduplicating index usage types when
1120                        // printing index usages in EXPLAIN.
1121                        if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1122                            this.index_reqs_by_id.get_mut(global_id).unwrap().push((
1123                                idx_id,
1124                                key.to_owned(),
1125                                IndexUsageType::FullScan,
1126                            ));
1127                            index_accesses.push((idx_id, IndexUsageType::FullScan));
1128                        }
1129                    }
1130                    if index_accesses.is_empty() {
1131                        *persist_or_index = AccessStrategy::Persist;
1132                    } else {
1133                        *persist_or_index = AccessStrategy::Index(index_accesses);
1134                    }
1135                }
1136                MirRelationExpr::Get {
1137                    id: Id::Local(local_id),
1138                    ..
1139                } => {
1140                    // Add the current context to the vector of contexts of `local_id`.
1141                    // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1142                    // empty entry.)
1143                    this.context_across_lets
1144                        .get_mut(local_id)
1145                        .unwrap()
1146                        .extend(contexts.iter().cloned());
1147                    // No recursive call here, because Get has no inputs.
1148                }
1149                MirRelationExpr::Let { id, value, body } => {
1150                    let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1151                    // No shadowing in MIR
1152                    assert_none!(shadowed_context);
1153                    // We go backwards: Recurse on the body and then the value.
1154                    this.collect_index_reqs_inner(body, contexts)?;
1155                    // The above call filled in the entry for `id` in `context_across_lets` (if it
1156                    // was referenced). Anyhow, at least an empty entry should exist, because we started
1157                    // above with inserting it.
1158                    this.collect_index_reqs_inner(value, &this.context_across_lets[id].clone())?;
1159                    // Clean up the id from the saved contexts.
1160                    this.context_across_lets.remove(id);
1161                }
1162                MirRelationExpr::LetRec {
1163                    ids,
1164                    values,
1165                    limits: _,
1166                    body,
1167                } => {
1168                    for id in ids.iter() {
1169                        let shadowed_context =
1170                            this.context_across_lets.insert(id.clone(), Vec::new());
1171                        // No shadowing in MIR
1172                        assert_none!(shadowed_context);
1173                    }
1174                    // We go backwards: Recurse on the body first.
1175                    this.collect_index_reqs_inner(body, contexts)?;
1176                    // Reset the contexts of the ids (of the current LetRec), because an arrangement
1177                    // from a value can't be used in the body.
1178                    for id in ids.iter() {
1179                        *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1180                    }
1181                    // Recurse on the values in reverse order.
1182                    // Note that we do only one pass, i.e., we won't see context through a Get that
1183                    // refers to the previous iteration. But this is ok, because we can't reuse
1184                    // arrangements across iterations anyway.
1185                    for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
1186                        this.collect_index_reqs_inner(
1187                            value,
1188                            &this.context_across_lets[id].clone(),
1189                        )?;
1190                    }
1191                    // Clean up the ids from the saved contexts.
1192                    for id in ids {
1193                        this.context_across_lets.remove(id);
1194                    }
1195                }
1196                _ => {
1197                    // Nothing interesting at this node, recurse with the empty context (regardless of
1198                    // what context we got from above).
1199                    let empty_context = Vec::new();
1200                    for child in expr.children_mut() {
1201                        this.collect_index_reqs_inner(child, &empty_context)?;
1202                    }
1203                }
1204            })
1205        })
1206    }
1207}
1208
1209/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1210/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1211/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1212/// looking for is
1213/// ```text
1214/// <operation that uses an index>
1215///   ArrangeBy <requested_keys>
1216///     Get <global_id>
1217/// ```
1218/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1219/// pattern is present above the `Get`.
1220///
1221/// Note that we usually put this struct in a Vec, because we track context across local let
1222/// bindings, which means that a node can have multiple parents.
1223#[derive(Debug, Clone)]
1224struct IndexUsageContext {
1225    usage_type: IndexUsageType,
1226    requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1227}
1228
1229impl IndexUsageContext {
1230    pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1231        vec![IndexUsageContext {
1232            usage_type,
1233            requested_keys: None,
1234        }]
1235    }
1236
1237    // Add the keys of an ArrangeBy into the contexts.
1238    // Soft_panics if haven't already seen something that indicates what the index will be used for.
1239    pub fn add_keys(
1240        old_contexts: &Vec<IndexUsageContext>,
1241        keys_to_add: &Vec<Vec<MirScalarExpr>>,
1242    ) -> Vec<IndexUsageContext> {
1243        let old_contexts = if old_contexts.is_empty() {
1244            // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1245            soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1246            // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1247            // have a place to note down the requested keys below.
1248            IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1249        } else {
1250            old_contexts.clone()
1251        };
1252        old_contexts
1253            .into_iter()
1254            .flat_map(|old_context| {
1255                if !matches!(
1256                    old_context.usage_type,
1257                    IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1258                ) {
1259                    // If it's not an unknown delta join usage, then we simply note down the new
1260                    // keys into `requested_keys`.
1261                    let mut context = old_context.clone();
1262                    if context.requested_keys.is_none() {
1263                        context.requested_keys = Some(BTreeSet::new());
1264                    }
1265                    context
1266                        .requested_keys
1267                        .as_mut()
1268                        .unwrap()
1269                        .extend(keys_to_add.iter().cloned());
1270                    Some(context).into_iter().chain(None)
1271                } else {
1272                    // If it's an unknown delta join usage, then we need to figure out whether this
1273                    // is a full scan or a lookup.
1274                    //
1275                    // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1276                    // scan when starting the rendering of a delta path. This is the one for which
1277                    // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1278                    //
1279                    // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1280                    // the `source_key` based on the MIR plan. We do this by doing the same as
1281                    // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1282                    let source_key = keys_to_add
1283                        .iter()
1284                        .min()
1285                        .expect("ArrangeBy below a delta join has at least one key");
1286                    let full_scan_context = IndexUsageContext {
1287                        requested_keys: Some(BTreeSet::from([source_key.clone()])),
1288                        usage_type: IndexUsageType::DeltaJoin(
1289                            DeltaJoinIndexUsageType::FirstInputFullScan,
1290                        ),
1291                    };
1292                    let lookup_keys = keys_to_add
1293                        .into_iter()
1294                        .filter(|key| *key != source_key)
1295                        .cloned()
1296                        .collect_vec();
1297                    if lookup_keys.is_empty() {
1298                        Some(full_scan_context).into_iter().chain(None)
1299                    } else {
1300                        let lookup_context = IndexUsageContext {
1301                            requested_keys: Some(lookup_keys.into_iter().collect()),
1302                            usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1303                        };
1304                        Some(full_scan_context)
1305                            .into_iter()
1306                            .chain(Some(lookup_context))
1307                    }
1308                }
1309            })
1310            .collect()
1311    }
1312}
1313
1314/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1315/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1316#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1317#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
1318pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1319    /// Notices that the optimizer wants to show to users.
1320    /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1321    pub optimizer_notices: Vec<Notice>,
1322    /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1323    /// `prune_and_annotate_dataflow_index_imports`.
1324    pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1325}
1326
1327impl<Notice> Default for DataflowMetainfo<Notice> {
1328    fn default() -> Self {
1329        DataflowMetainfo {
1330            optimizer_notices: Vec::new(),
1331            index_usage_types: BTreeMap::new(),
1332        }
1333    }
1334}
1335
1336impl<Notice> DataflowMetainfo<Notice> {
1337    /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1338    /// `index_ids` iterator against an entry expected to exist in the
1339    /// [`DataflowMetainfo::index_usage_types`].
1340    pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1341        UsedIndexes::new(
1342            df_desc
1343                .index_imports
1344                .iter()
1345                .map(|(id, _)| {
1346                    let entry = self.index_usage_types.get(id).cloned();
1347                    // If an entry does not exist, mark the usage type for this
1348                    // index as `Unknown`.
1349                    //
1350                    // This should never happen if this method is called after
1351                    // running `prune_and_annotate_dataflow_index_imports` on
1352                    // the dataflow (this happens at the end of the
1353                    // `optimize_dataflow` call).
1354                    let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1355
1356                    (*id, index_usage_type)
1357                })
1358                .collect(),
1359        )
1360    }
1361}
1362
1363impl DataflowMetainfo<RawOptimizerNotice> {
1364    /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1365    /// only if the exact same notice is not already present.
1366    pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1367    where
1368        T: Into<RawOptimizerNotice>,
1369    {
1370        let notice = notice.into();
1371        if !self.optimizer_notices.contains(&notice) {
1372            self.optimizer_notices.push(notice);
1373        }
1374    }
1375}