mz_transform/dataflow.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22 AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42 target = "optimizer",
43 level = "debug",
44 fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47 dataflow: &mut DataflowDesc,
48 transform_ctx: &mut TransformCtx,
49 fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51 // Inline views that are used in only one other view.
52 inline_views(dataflow)?;
53
54 if fast_path_optimizer {
55 optimize_dataflow_relations(
56 dataflow,
57 &Optimizer::fast_path_optimizer(transform_ctx),
58 transform_ctx,
59 )?;
60 } else {
61 // Logical optimization pass after view inlining
62 optimize_dataflow_relations(
63 dataflow,
64 #[allow(deprecated)]
65 &Optimizer::logical_optimizer(transform_ctx),
66 transform_ctx,
67 )?;
68
69 optimize_dataflow_filters(dataflow)?;
70 // TODO: when the linear operator contract ensures that propagated
71 // predicates are always applied, projections and filters can be removed
72 // from where they come from. Once projections and filters can be removed,
73 // TODO: it would be useful for demand to be optimized after filters
74 // that way demand only includes the columns that are still necessary after
75 // the filters are applied.
76 optimize_dataflow_demand(dataflow)?;
77
78 // A smaller logical optimization pass after projections and filters are
79 // pushed down across views.
80 optimize_dataflow_relations(
81 dataflow,
82 &Optimizer::logical_cleanup_pass(transform_ctx, false),
83 transform_ctx,
84 )?;
85
86 // Physical optimization pass
87 optimize_dataflow_relations(
88 dataflow,
89 &Optimizer::physical_optimizer(transform_ctx),
90 transform_ctx,
91 )?;
92
93 optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94 }
95
96 prune_and_annotate_dataflow_index_imports(
97 dataflow,
98 transform_ctx.indexes,
99 transform_ctx.df_meta,
100 )?;
101
102 // Warning: If you want to add a transform call here, consider it very carefully whether it
103 // could accidentally invalidate information that we already derived above in
104 // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
105
106 mz_repr::explain::trace_plan(dataflow);
107
108 Ok(())
109}
110
111/// Inline views used in one other view, and in no exported objects.
112#[mz_ore::instrument(
113 target = "optimizer",
114 level = "debug",
115 fields(path.segment = "inline_views")
116)]
117fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
118 // We cannot inline anything whose `BuildDesc::id` appears in either the
119 // `index_exports` or `sink_exports` of `dataflow`, because we lose our
120 // ability to name it.
121
122 // A view can / should be in-lined in another view if it is only used by
123 // one subsequent view. If there are two distinct views that have not
124 // themselves been merged, then too bad and it doesn't get inlined.
125
126 // Starting from the *last* object to build, walk backwards and inline
127 // any view that is neither referenced by a `index_exports` nor
128 // `sink_exports` nor more than two remaining objects to build.
129
130 for index in (0..dataflow.objects_to_build.len()).rev() {
131 // Capture the name used by others to reference this view.
132 let global_id = dataflow.objects_to_build[index].id;
133 // Determine if any exports directly reference this view.
134 let mut occurs_in_export = false;
135 for (_gid, sink_desc) in dataflow.sink_exports.iter() {
136 if sink_desc.from == global_id {
137 occurs_in_export = true;
138 }
139 }
140 for (_, (index_desc, _)) in dataflow.index_exports.iter() {
141 if index_desc.on_id == global_id {
142 occurs_in_export = true;
143 }
144 }
145 // Count the number of subsequent views that reference this view.
146 let mut occurrences_in_later_views = Vec::new();
147 for other in (index + 1)..dataflow.objects_to_build.len() {
148 if dataflow.objects_to_build[other]
149 .plan
150 .depends_on()
151 .contains(&global_id)
152 {
153 occurrences_in_later_views.push(other);
154 }
155 }
156 // Inline if the view is referenced in one view and no exports.
157 if !occurs_in_export && occurrences_in_later_views.len() == 1 {
158 let other = occurrences_in_later_views[0];
159 // We can remove this view and insert it in the later view,
160 // but are not able to relocate the later view `other`.
161
162 // When splicing in the `index` view, we need to create disjoint
163 // identifiers for the Let's `body` and `value`, as well as a new
164 // identifier for the binding itself. Following `NormalizeLets`, we
165 // go with the binding first, then the value, then the body.
166 let mut id_gen = crate::IdGen::default();
167 let new_local = LocalId::new(id_gen.allocate_id());
168 // Use the same `id_gen` to assign new identifiers to `index`.
169 crate::normalize_lets::renumber_bindings(
170 dataflow.objects_to_build[index].plan.as_inner_mut(),
171 &mut id_gen,
172 )?;
173 // Assign new identifiers to the other relation.
174 crate::normalize_lets::renumber_bindings(
175 dataflow.objects_to_build[other].plan.as_inner_mut(),
176 &mut id_gen,
177 )?;
178 // Install the `new_local` name wherever `global_id` was used.
179 dataflow.objects_to_build[other]
180 .plan
181 .as_inner_mut()
182 .visit_pre_mut(|expr| {
183 if let MirRelationExpr::Get { id, .. } = expr {
184 if id == &Id::Global(global_id) {
185 *id = Id::Local(new_local);
186 }
187 }
188 });
189
190 // With identifiers rewritten, we can replace `other` with
191 // a `MirRelationExpr::Let` binding, whose value is `index` and
192 // whose body is `other`.
193 let body = dataflow.objects_to_build[other]
194 .plan
195 .as_inner_mut()
196 .take_dangerous();
197 let value = dataflow.objects_to_build[index]
198 .plan
199 .as_inner_mut()
200 .take_dangerous();
201 *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
202 id: new_local,
203 value: Box::new(value),
204 body: Box::new(body),
205 };
206 dataflow.objects_to_build.remove(index);
207 }
208 }
209
210 mz_repr::explain::trace_plan(dataflow);
211
212 Ok(())
213}
214
215/// Performs either the logical or the physical optimization pass on the
216/// dataflow using the supplied set of indexes.
217#[mz_ore::instrument(
218 target = "optimizer",
219 level = "debug",
220 fields(path.segment = optimizer.name)
221)]
222fn optimize_dataflow_relations(
223 dataflow: &mut DataflowDesc,
224 optimizer: &Optimizer,
225 ctx: &mut TransformCtx,
226) -> Result<(), TransformError> {
227 // Re-optimize each dataflow
228 for object in dataflow.objects_to_build.iter_mut() {
229 // Re-run all optimizations on the composite views.
230 ctx.set_global_id(object.id);
231 optimizer.transform(object.plan.as_inner_mut(), ctx)?;
232 ctx.reset_global_id();
233 }
234
235 mz_repr::explain::trace_plan(dataflow);
236
237 Ok(())
238}
239
240/// Pushes demand information from published outputs to dataflow inputs,
241/// projecting away unnecessary columns.
242///
243/// Dataflows that exist for the sake of generating plan explanations do not
244/// have published outputs. In this case, we push demand information from views
245/// not depended on by other views to dataflow inputs.
246#[mz_ore::instrument(
247 target = "optimizer",
248 level = "debug",
249 fields(path.segment ="demand")
250)]
251fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
252 // Maps id -> union of known columns demanded from the source/view with the
253 // corresponding id.
254 let mut demand = BTreeMap::new();
255
256 if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
257 // In the absence of any exports, just demand all columns from views
258 // that are not depended on by another view, which is currently the last
259 // object in `objects_to_build`.
260
261 // A DataflowDesc without exports is currently created in the context of
262 // EXPLAIN outputs. This ensures that the output has all the columns of
263 // the original explainee.
264 if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
265 demand
266 .entry(Id::Global(build_desc.id))
267 .or_insert_with(BTreeSet::new)
268 .extend(0..build_desc.plan.as_inner_mut().arity());
269 }
270 } else {
271 // Demand all columns of inputs to sinks.
272 for (_id, sink) in dataflow.sink_exports.iter() {
273 let input_id = sink.from;
274 demand
275 .entry(Id::Global(input_id))
276 .or_insert_with(BTreeSet::new)
277 .extend(0..dataflow.arity_of(&input_id));
278 }
279
280 // Demand all columns of inputs to exported indexes.
281 for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
282 let input_id = desc.on_id;
283 demand
284 .entry(Id::Global(input_id))
285 .or_insert_with(BTreeSet::new)
286 .extend(0..dataflow.arity_of(&input_id));
287 }
288 }
289
290 optimize_dataflow_demand_inner(
291 dataflow
292 .objects_to_build
293 .iter_mut()
294 .rev()
295 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
296 &mut demand,
297 )?;
298
299 mz_repr::explain::trace_plan(dataflow);
300
301 Ok(())
302}
303
304/// Pushes demand through views in `view_sequence` in order, removing
305/// columns not demanded.
306///
307/// This method is made public for the sake of testing.
308/// TODO: make this private once we allow multiple exports per dataflow.
309pub fn optimize_dataflow_demand_inner<'a, I>(
310 view_sequence: I,
311 demand: &mut BTreeMap<Id, BTreeSet<usize>>,
312) -> Result<(), TransformError>
313where
314 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
315{
316 // Maps id -> The projection that was pushed down on the view with the
317 // corresponding id.
318 let mut applied_projection = BTreeMap::new();
319 // Collect the mutable references to views after pushing projection down
320 // in order to run cleanup actions on them in a second loop.
321 let mut view_refs = Vec::new();
322 let projection_pushdown = crate::movement::ProjectionPushdown::default();
323 for (id, view) in view_sequence {
324 if let Some(columns) = demand.get(&id) {
325 let projection_pushed_down = columns.iter().map(|c| *c).collect();
326 // Push down the projection consisting of the entries of `columns`
327 // in increasing order.
328 projection_pushdown.action(view, &projection_pushed_down, demand)?;
329 let new_type = view.typ();
330 applied_projection.insert(id, (projection_pushed_down, new_type));
331 }
332 view_refs.push(view);
333 }
334
335 for view in view_refs {
336 // Update `Get` nodes to reflect any columns that have been projected away.
337 projection_pushdown.update_projection_around_get(view, &applied_projection);
338 }
339
340 Ok(())
341}
342
343/// Pushes predicate to dataflow inputs.
344#[mz_ore::instrument(
345 target = "optimizer",
346 level = "debug",
347 fields(path.segment ="filters")
348)]
349fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
350 // Contains id -> predicates map, describing those predicates that
351 // can (but need not) be applied to the collection named by `id`.
352 let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
353
354 // Propagate predicate information from outputs to inputs.
355 optimize_dataflow_filters_inner(
356 dataflow
357 .objects_to_build
358 .iter_mut()
359 .rev()
360 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
361 &mut predicates,
362 )?;
363
364 // Push predicate information into the SourceDesc.
365 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
366 let source = &mut source_import.desc;
367 if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
368 if !list.is_empty() {
369 // Canonicalize the order of predicates, for stable plans.
370 let mut list = list.into_iter().collect::<Vec<_>>();
371 list.sort();
372 // Install no-op predicate information if none exists.
373 if source.arguments.operators.is_none() {
374 source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
375 }
376 // Add any predicates that can be pushed to the source.
377 if let Some(operator) = source.arguments.operators.take() {
378 source.arguments.operators = Some(operator.filter(list));
379 source.arguments.operators.as_mut().map(|x| x.optimize());
380 }
381 }
382 }
383 }
384
385 mz_repr::explain::trace_plan(dataflow);
386
387 Ok(())
388}
389
390/// Pushes filters down through views in `view_sequence` in order.
391///
392/// This method is made public for the sake of testing.
393/// TODO: make this private once we allow multiple exports per dataflow.
394pub fn optimize_dataflow_filters_inner<'a, I>(
395 view_iter: I,
396 predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
397) -> Result<(), TransformError>
398where
399 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
400{
401 let transform = crate::predicate_pushdown::PredicatePushdown::default();
402 for (id, view) in view_iter {
403 if let Some(list) = predicates.get(&id).clone() {
404 if !list.is_empty() {
405 *view = view.take_dangerous().filter(list.iter().cloned());
406 }
407 }
408 transform.action(view, predicates)?;
409 }
410 Ok(())
411}
412
413/// Propagates information about monotonic inputs through operators,
414/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
415#[mz_ore::instrument(
416 target = "optimizer",
417 level = "debug",
418 fields(path.segment ="monotonic")
419)]
420pub fn optimize_dataflow_monotonic(
421 dataflow: &mut DataflowDesc,
422 ctx: &mut TransformCtx,
423) -> Result<(), TransformError> {
424 let mut monotonic_ids = BTreeSet::new();
425 for (source_id, source_import) in dataflow.source_imports.iter() {
426 if source_import.monotonic {
427 monotonic_ids.insert(source_id.clone());
428 }
429 }
430 for (
431 _index_id,
432 IndexImport {
433 desc: index_desc,
434 monotonic,
435 ..
436 },
437 ) in dataflow.index_imports.iter()
438 {
439 if *monotonic {
440 monotonic_ids.insert(index_desc.on_id.clone());
441 }
442 }
443
444 let monotonic_flag = MonotonicFlag::default();
445
446 for build_desc in dataflow.objects_to_build.iter_mut() {
447 monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
448 }
449
450 mz_repr::explain::trace_plan(dataflow);
451
452 Ok(())
453}
454
455/// Determine whether we require snapshots from our durable source imports.
456/// (For example, these can often be skipped for simple subscribe queries.)
457pub fn optimize_dataflow_snapshot(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
458 // For every global id, true iff we need a snapshot for that global ID.
459 // This is computed bottom-up: subscribes may or may not require a snapshot from their inputs,
460 // index exports definitely do, and objects-to-build require a snapshot from their inputs if
461 // either they need to provide a snapshot as output or they may need snapshots internally, eg. to
462 // compute a join.
463 let mut downstream_requires_snapshot = BTreeMap::new();
464
465 for (_id, export) in &dataflow.sink_exports {
466 *downstream_requires_snapshot
467 .entry(Id::Global(export.from))
468 .or_default() |= export.with_snapshot;
469 }
470 for (_id, (export, _typ)) in &dataflow.index_exports {
471 *downstream_requires_snapshot
472 .entry(Id::Global(export.on_id))
473 .or_default() |= true;
474 }
475 for BuildDesc { id: _, plan } in dataflow.objects_to_build.iter().rev() {
476 // For now, we treat all intermediate nodes as potentially requiring a snapshot.
477 // Walk the AST, marking anything depended on by a compute object as snapshot-required.
478 let mut todo = vec![(true, &plan.0)];
479 while let Some((requires_snapshot, expr)) = todo.pop() {
480 match expr {
481 MirRelationExpr::Get { id, .. } => {
482 *downstream_requires_snapshot.entry(*id).or_default() |= requires_snapshot;
483 }
484 other => {
485 todo.extend(other.children().rev().map(|c| (true, c)));
486 }
487 }
488 }
489 }
490 for (id, import) in &mut dataflow.source_imports {
491 let with_snapshot = downstream_requires_snapshot
492 .entry(Id::Global(*id))
493 .or_default();
494
495 // As above, fetch the snapshot if there are any transformations on the raw source data.
496 // (And we'll always need to check for things like temporal filters, since those allow
497 // snapshot data to affect diffs at times past the as-of.)
498 *with_snapshot |= import.desc.arguments.operators.is_some();
499
500 import.with_snapshot = *with_snapshot;
501 }
502 for (_id, import) in &mut dataflow.index_imports {
503 let with_snapshot = downstream_requires_snapshot
504 .entry(Id::Global(import.desc.on_id))
505 .or_default();
506
507 import.with_snapshot = *with_snapshot;
508 }
509
510 Ok(())
511}
512
513/// Restricts the indexes imported by `dataflow` to only the ones it needs.
514/// It also adds to the `DataflowMetainfo` how each index will be used.
515/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
516/// their index usage types.
517///
518/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
519/// references.
520///
521/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
522/// `ArrangeBy`s at the top of unused Let bindings.
523#[mz_ore::instrument(
524 target = "optimizer",
525 level = "debug",
526 fields(path.segment = "index_imports")
527)]
528fn prune_and_annotate_dataflow_index_imports(
529 dataflow: &mut DataflowDesc,
530 indexes: &dyn IndexOracle,
531 dataflow_metainfo: &mut DataflowMetainfo,
532) -> Result<(), TransformError> {
533 // Preparation.
534 // Let's save the unique keys of the sources. This will inform which indexes to choose for full
535 // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
536 // sources that are not getting an indexed read.)
537 let mut source_keys = BTreeMap::new();
538 for build_desc in dataflow.objects_to_build.iter() {
539 build_desc
540 .plan
541 .as_inner()
542 .visit_pre(|expr: &MirRelationExpr| match expr {
543 MirRelationExpr::Get {
544 id: Id::Global(global_id),
545 typ,
546 ..
547 } => {
548 source_keys.entry(*global_id).or_insert_with(|| {
549 typ.keys
550 .iter()
551 .map(|key| {
552 key.iter()
553 // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
554 // later we can more easily compare index keys to these keys.
555 .map(|col_idx| MirScalarExpr::column(*col_idx))
556 .collect()
557 })
558 .collect()
559 });
560 }
561 _ => {}
562 });
563 }
564
565 // This will be a mapping of
566 // (ids used by exports and objects to build) ->
567 // (arrangement keys and usage types on that id that have been requested)
568 let mut index_reqs_by_id = BTreeMap::new();
569
570 // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
571 // for which we also have an available index.
572 for build_desc in dataflow.objects_to_build.iter_mut() {
573 CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
574 .collect_index_reqs(build_desc.plan.as_inner_mut())?;
575 }
576
577 // Collect index usages by `sink_exports`.
578 // A sink export sometimes wants to directly use an imported index. I know of one case where
579 // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
580 // `objects_to_build`, but will want to directly read from the index and write to a sink.
581 for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
582 // First, let's see if there exists an index on the id that the sink wants. If not, there is
583 // nothing we can do here.
584 if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
585 // If yes, then we'll add a request of _some_ index: If we already collected an index
586 // request on this id, then use that, otherwise use the above arbitrarily picked index.
587 let requested_idxs = index_reqs_by_id
588 .entry(sink_desc.from)
589 .or_insert_with(Vec::new);
590 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
591 requested_idxs.push((
592 *already_req_idx_id,
593 already_req_key.clone(),
594 IndexUsageType::SinkExport,
595 ));
596 } else {
597 requested_idxs.push((
598 idx_id,
599 arbitrary_idx_key.to_owned(),
600 IndexUsageType::SinkExport,
601 ));
602 }
603 }
604 }
605
606 // Collect index usages by `index_exports`.
607 for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
608 // First, let's see if there exists an index on the id that the exported index is on. If
609 // not, there is nothing we can do here.
610 if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
611 // If yes, then we'll add an index request of some index: If we already collected an
612 // index request on this id, then use that, otherwise use the above arbitrarily picked
613 // index.
614 let requested_idxs = index_reqs_by_id
615 .entry(index_desc.on_id)
616 .or_insert_with(Vec::new);
617 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
618 requested_idxs.push((
619 *already_req_idx_id,
620 already_req_key.clone(),
621 IndexUsageType::IndexExport,
622 ));
623 } else {
624 // This is surprising: Actually, an index creation dataflow always has a plan in
625 // `objects_to_build` that will have a Get of the object that the index is on (see
626 // `DataflowDescription::export_index`). Therefore, we should have already requested
627 // an index usage when seeing that Get in `CollectIndexRequests`.
628 soft_panic_or_log!(
629 "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
630 );
631 requested_idxs.push((
632 idx_id,
633 arbitrary_index_key.to_owned(),
634 IndexUsageType::IndexExport,
635 ));
636 }
637 }
638 }
639
640 // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
641 // Moreover, for each of these ids, if any index exists on it, then we should have already
642 // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
643 // Unknown usage, picking an arbitrary index.
644 for (id, index_reqs) in index_reqs_by_id.iter_mut() {
645 if index_reqs.is_empty() {
646 // Try to pick an arbitrary index to be fully scanned.
647 if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
648 soft_panic_or_log!(
649 "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
650id: {}, key: {:?}",
651 id,
652 key
653 );
654 index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
655 }
656 }
657 }
658
659 // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
660 // request on the same id.
661 // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
662 // that if a Get has that id, then any full scan index accesses on it should be changed to use
663 // the indicated index id.
664 let mut full_scan_changes = BTreeMap::new();
665 for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
666 // Let's choose a non-FullScan access (if exists).
667 if let Some((picked_idx, picked_idx_key)) = choose_index(
668 &source_keys,
669 get_id,
670 &index_reqs
671 .iter()
672 .filter_map(|(idx_id, key, usage_type)| match usage_type {
673 IndexUsageType::FullScan => None,
674 _ => Some((*idx_id, key.clone())),
675 })
676 .collect_vec(),
677 ) {
678 // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
679 for (idx_id, key, usage_type) in index_reqs {
680 match usage_type {
681 IndexUsageType::FullScan => {
682 full_scan_changes.insert(get_id, picked_idx);
683 *idx_id = picked_idx;
684 key.clone_from(&picked_idx_key);
685 }
686 _ => {}
687 }
688 }
689 }
690 }
691 // Apply the above full scan changes to also the Gets.
692 for build_desc in dataflow.objects_to_build.iter_mut() {
693 build_desc
694 .plan
695 .as_inner_mut()
696 .visit_pre_mut(|expr: &mut MirRelationExpr| {
697 match expr {
698 MirRelationExpr::Get {
699 id: Id::Global(global_id),
700 typ: _,
701 access_strategy: persist_or_index,
702 } => {
703 if let Some(new_idx_id) = full_scan_changes.get(global_id) {
704 match persist_or_index {
705 AccessStrategy::UnknownOrLocal => {
706 // Should have been already filled by `collect_index_reqs`.
707 unreachable!()
708 }
709 AccessStrategy::Persist => {
710 // We already know that it's an indexed access.
711 unreachable!()
712 }
713 AccessStrategy::SameDataflow => {
714 // We have not added such annotations yet.
715 unreachable!()
716 }
717 AccessStrategy::Index(accesses) => {
718 for (idx_id, usage_type) in accesses {
719 if matches!(usage_type, IndexUsageType::FullScan) {
720 *idx_id = *new_idx_id;
721 }
722 }
723 }
724 }
725 }
726 }
727 _ => {}
728 }
729 });
730 }
731
732 // Annotate index imports by their usage types
733 dataflow_metainfo.index_usage_types = BTreeMap::new();
734 for (
735 index_id,
736 IndexImport {
737 desc: index_desc,
738 typ: _,
739 monotonic: _,
740 with_snapshot: _,
741 },
742 ) in dataflow.index_imports.iter_mut()
743 {
744 // A sanity check that we are not importing an index that we are also exporting.
745 assert!(
746 !dataflow
747 .index_exports
748 .iter()
749 .map(|(exported_index_id, _)| exported_index_id)
750 .any(|exported_index_id| exported_index_id == index_id)
751 );
752
753 let mut new_usage_types = Vec::new();
754 // Let's see whether something has requested an index on this object that this imported
755 // index is on.
756 if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
757 for (req_idx_id, req_key, req_usage_type) in index_reqs {
758 if req_idx_id == index_id {
759 soft_assert_eq_or_log!(*req_key, index_desc.key);
760 new_usage_types.push(req_usage_type.clone());
761 }
762 }
763 }
764 if !new_usage_types.is_empty() {
765 dataflow_metainfo
766 .index_usage_types
767 .insert(*index_id, new_usage_types);
768 }
769 }
770
771 // Prune index imports to only those that are used
772 dataflow
773 .index_imports
774 .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
775
776 // Determine AccessStrategy::SameDataflow accesses. These were classified as
777 // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
778 // a collection that we are building ourselves, then we adjust the access strategy.
779 let mut objects_to_build_ids = BTreeSet::new();
780 for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
781 objects_to_build_ids.insert(id.clone());
782 }
783 for build_desc in dataflow.objects_to_build.iter_mut() {
784 build_desc
785 .plan
786 .as_inner_mut()
787 .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
788 MirRelationExpr::Get {
789 id: Id::Global(global_id),
790 typ: _,
791 access_strategy,
792 } => match access_strategy {
793 AccessStrategy::Persist => {
794 if objects_to_build_ids.contains(global_id) {
795 *access_strategy = AccessStrategy::SameDataflow;
796 }
797 }
798 _ => {}
799 },
800 _ => {}
801 });
802 }
803
804 // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
805 for build_desc in dataflow.objects_to_build.iter() {
806 build_desc
807 .plan
808 .as_inner()
809 .visit_pre(|expr: &MirRelationExpr| match expr {
810 MirRelationExpr::Get {
811 id: Id::Global(_),
812 typ: _,
813 access_strategy: AccessStrategy::Index(accesses),
814 } => {
815 for (idx_id, _) in accesses {
816 soft_assert_or_log!(
817 dataflow.index_imports.contains_key(idx_id),
818 "Dangling Get index annotation"
819 );
820 }
821 }
822 _ => {}
823 });
824 }
825
826 mz_repr::explain::trace_plan(dataflow);
827
828 Ok(())
829}
830
831/// Pick an index from a given Vec of index keys.
832///
833/// Currently, we pick as follows:
834/// - If there is an index on a unique key, then we pick that. (It might be better distributed, and
835/// is less likely to get dropped than other indexes.)
836/// - Otherwise, we pick an arbitrary index.
837///
838/// TODO: There are various edge cases where a better choice would be possible:
839/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
840/// capture this already.)
841/// - Some indexes might have an error, while others don't.
842/// <https://github.com/MaterializeInc/database-issues/issues/4455>
843/// - Some indexes might have more extra data in their keys (because of being on more complicated
844/// expressions than just column references), which won't be used in a full scan.
845fn choose_index(
846 source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
847 id: &GlobalId,
848 indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
849) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
850 match source_keys.get(id) {
851 None => indexes.iter().next().cloned(), // pick an arbitrary index
852 Some(coll_keys) => match indexes
853 .iter()
854 .find(|(_idx_id, key)| coll_keys.contains(&*key))
855 {
856 Some((idx_id, key)) => Some((*idx_id, key.clone())),
857 None => indexes.iter().next().cloned(), // pick an arbitrary index
858 },
859 }
860}
861
862#[derive(Debug)]
863struct CollectIndexRequests<'a> {
864 /// We were told about these unique keys on sources.
865 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
866 /// We were told about these indexes being available.
867 indexes_available: &'a dyn IndexOracle,
868 /// We'll be collecting index requests here.
869 index_reqs_by_id:
870 &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
871 /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
872 /// current node (see docs on `IndexUsageContext` about what context we keep).
873 /// Moreover, we need to propagate this context from cte uses to cte definitions.
874 /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
875 /// added together.
876 context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
877 recursion_guard: RecursionGuard,
878}
879
880impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
881 fn recursion_guard(&self) -> &RecursionGuard {
882 &self.recursion_guard
883 }
884}
885
886impl<'a> CollectIndexRequests<'a> {
887 fn new(
888 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
889 indexes_available: &'a dyn IndexOracle,
890 index_reqs_by_id: &'a mut BTreeMap<
891 GlobalId,
892 Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
893 >,
894 ) -> CollectIndexRequests<'a> {
895 CollectIndexRequests {
896 source_keys,
897 indexes_available,
898 index_reqs_by_id,
899 context_across_lets: BTreeMap::new(),
900 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
901 }
902 }
903
904 pub fn collect_index_reqs(
905 &mut self,
906 expr: &mut MirRelationExpr,
907 ) -> Result<(), RecursionLimitError> {
908 assert!(self.context_across_lets.is_empty());
909 self.collect_index_reqs_inner(
910 expr,
911 &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
912 )?;
913 assert!(self.context_across_lets.is_empty());
914 // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
915 for (_id, index_reqs) in self.index_reqs_by_id.iter() {
916 for (_, _, index_usage_type) in index_reqs {
917 soft_assert_or_log!(
918 !matches!(
919 index_usage_type,
920 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
921 ),
922 "Delta join Unknown index usage remained"
923 );
924 }
925 }
926 Ok(())
927 }
928
929 fn collect_index_reqs_inner(
930 &mut self,
931 expr: &mut MirRelationExpr,
932 contexts: &Vec<IndexUsageContext>,
933 ) -> Result<(), RecursionLimitError> {
934 self.checked_recur_mut(|this| {
935 // If an index exists on `on_id`, this function picks an index to be fully scanned.
936 let pick_index_for_full_scan = |on_id: &GlobalId| {
937 // Note that the choice we make here might be modified later at the
938 // "Adjust FullScans to not introduce a new index dependency".
939 choose_index(
940 this.source_keys,
941 on_id,
942 &this
943 .indexes_available
944 .indexes_on(*on_id)
945 .map(|(idx_id, key)| (idx_id, key.iter().cloned().collect_vec()))
946 .collect_vec(),
947 )
948 };
949
950 // See comment on `IndexUsageContext`.
951 Ok(match expr {
952 MirRelationExpr::Join {
953 inputs,
954 implementation,
955 ..
956 } => {
957 match implementation {
958 JoinImplementation::Differential(..) => {
959 for input in inputs {
960 this.collect_index_reqs_inner(
961 input,
962 &IndexUsageContext::from_usage_type(
963 IndexUsageType::DifferentialJoin,
964 ),
965 )?;
966 }
967 }
968 JoinImplementation::DeltaQuery(..) => {
969 // For Delta joins, the first input is special, see
970 // https://github.com/MaterializeInc/database-issues/issues/2115
971 this.collect_index_reqs_inner(
972 &mut inputs[0],
973 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
974 DeltaJoinIndexUsageType::Unknown,
975 )),
976 )?;
977 for input in &mut inputs[1..] {
978 this.collect_index_reqs_inner(
979 input,
980 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
981 DeltaJoinIndexUsageType::Lookup,
982 )),
983 )?;
984 }
985 }
986 JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
987 for input in inputs {
988 this.collect_index_reqs_inner(
989 input,
990 &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(
991 *idx_id,
992 )),
993 )?;
994 }
995 }
996 JoinImplementation::Unimplemented => {
997 soft_panic_or_log!(
998 "CollectIndexRequests encountered an Unimplemented join"
999 );
1000 }
1001 }
1002 }
1003 MirRelationExpr::ArrangeBy { input, keys } => {
1004 let ctx = &IndexUsageContext::add_keys(contexts, keys);
1005 this.collect_index_reqs_inner(input, ctx)?;
1006 }
1007 MirRelationExpr::Get {
1008 id: Id::Global(global_id),
1009 access_strategy: persist_or_index,
1010 ..
1011 } => {
1012 this.index_reqs_by_id
1013 .entry(*global_id)
1014 .or_insert_with(Vec::new);
1015 // If the context is empty, it means we didn't see an operator that would
1016 // specifically want to use an index for this Get. However, let's still try to
1017 // find an index for a full scan.
1018 let mut try_full_scan = contexts.is_empty();
1019 let mut index_accesses = Vec::new();
1020 for context in contexts {
1021 match &context.requested_keys {
1022 None => {
1023 // We have some index usage context, but didn't see an `ArrangeBy`.
1024 try_full_scan = true;
1025 match context.usage_type {
1026 IndexUsageType::FullScan
1027 | IndexUsageType::SinkExport
1028 | IndexUsageType::IndexExport => {
1029 // Not possible, because these don't go through
1030 // IndexUsageContext at all.
1031 unreachable!()
1032 }
1033 // You can find more info on why the following join cases
1034 // shouldn't happen in comments of the Join lowering to LIR.
1035 IndexUsageType::Lookup(_) => soft_panic_or_log!(
1036 "CollectIndexRequests encountered \
1037 an IndexedFilter join without an ArrangeBy"
1038 ),
1039 IndexUsageType::DifferentialJoin => soft_panic_or_log!(
1040 "CollectIndexRequests encountered \
1041 a Differential join without an ArrangeBy"
1042 ),
1043 IndexUsageType::DeltaJoin(_) => soft_panic_or_log!(
1044 "CollectIndexRequests encountered \
1045 a Delta join without an ArrangeBy"
1046 ),
1047 IndexUsageType::PlanRootNoArrangement => {
1048 // This is ok: the entire plan is a `Get`, with not even an
1049 // `ArrangeBy`. Note that if an index exists, the usage will
1050 // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
1051 // because we are going into the `try_full_scan` if.
1052 }
1053 IndexUsageType::FastPathLimit => {
1054 // These are created much later, not even inside
1055 // `prune_and_annotate_dataflow_index_imports`.
1056 unreachable!()
1057 }
1058 IndexUsageType::DanglingArrangeBy => {
1059 // Not possible, because we create `DanglingArrangeBy`
1060 // only when we see an `ArrangeBy`.
1061 unreachable!()
1062 }
1063 IndexUsageType::Unknown => {
1064 // These are added only after `CollectIndexRequests` has run.
1065 unreachable!()
1066 }
1067 }
1068 }
1069 Some(requested_keys) => {
1070 for requested_key in requested_keys {
1071 match this.indexes_available.indexes_on(*global_id).find(
1072 |(available_idx_id, available_key)| {
1073 match context.usage_type {
1074 IndexUsageType::Lookup(req_idx_id) => {
1075 // `LiteralConstraints` already picked an index
1076 // by id. Let's use that one.
1077 assert!(
1078 !(available_idx_id == &req_idx_id
1079 && available_key != &requested_key)
1080 );
1081 available_idx_id == &req_idx_id
1082 }
1083 _ => available_key == &requested_key,
1084 }
1085 },
1086 ) {
1087 Some((idx_id, key)) => {
1088 let usage = context.usage_type.clone();
1089 this.index_reqs_by_id
1090 .get_mut(global_id)
1091 .unwrap()
1092 .push((idx_id, key.to_owned(), usage.clone()));
1093 index_accesses.push((idx_id, usage));
1094 }
1095 None => {
1096 // If there is a key requested for which we don't have an
1097 // index, then we might still be able to do a full scan of a
1098 // differently keyed index.
1099 try_full_scan = true;
1100 }
1101 }
1102 }
1103 if requested_keys.is_empty() {
1104 // It's a bit weird if an MIR ArrangeBy is not requesting any
1105 // key, but let's try a full scan in that case anyhow.
1106 try_full_scan = true;
1107 }
1108 }
1109 }
1110 }
1111 if try_full_scan {
1112 // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1113 // this code can't distinguish between the case when there is 1 ArrangeBy at the
1114 // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1115 // we'll add only 1 full scan, which would be wrong in the latter case. However,
1116 // the latter case can't currently happen until we do
1117 // https://github.com/MaterializeInc/database-issues/issues/6363
1118 // Also note that currently we are deduplicating index usage types when
1119 // printing index usages in EXPLAIN.
1120 if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1121 this.index_reqs_by_id.get_mut(global_id).unwrap().push((
1122 idx_id,
1123 key.to_owned(),
1124 IndexUsageType::FullScan,
1125 ));
1126 index_accesses.push((idx_id, IndexUsageType::FullScan));
1127 }
1128 }
1129 if index_accesses.is_empty() {
1130 *persist_or_index = AccessStrategy::Persist;
1131 } else {
1132 *persist_or_index = AccessStrategy::Index(index_accesses);
1133 }
1134 }
1135 MirRelationExpr::Get {
1136 id: Id::Local(local_id),
1137 ..
1138 } => {
1139 // Add the current context to the vector of contexts of `local_id`.
1140 // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1141 // empty entry.)
1142 this.context_across_lets
1143 .get_mut(local_id)
1144 .unwrap()
1145 .extend(contexts.iter().cloned());
1146 // No recursive call here, because Get has no inputs.
1147 }
1148 MirRelationExpr::Let { id, value, body } => {
1149 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1150 // No shadowing in MIR
1151 assert_none!(shadowed_context);
1152 // We go backwards: Recurse on the body and then the value.
1153 this.collect_index_reqs_inner(body, contexts)?;
1154 // The above call filled in the entry for `id` in `context_across_lets` (if it
1155 // was referenced). Anyhow, at least an empty entry should exist, because we started
1156 // above with inserting it.
1157 this.collect_index_reqs_inner(value, &this.context_across_lets[id].clone())?;
1158 // Clean up the id from the saved contexts.
1159 this.context_across_lets.remove(id);
1160 }
1161 MirRelationExpr::LetRec {
1162 ids,
1163 values,
1164 limits: _,
1165 body,
1166 } => {
1167 for id in ids.iter() {
1168 let shadowed_context =
1169 this.context_across_lets.insert(id.clone(), Vec::new());
1170 // No shadowing in MIR
1171 assert_none!(shadowed_context);
1172 }
1173 // We go backwards: Recurse on the body first.
1174 this.collect_index_reqs_inner(body, contexts)?;
1175 // Reset the contexts of the ids (of the current LetRec), because an arrangement
1176 // from a value can't be used in the body.
1177 for id in ids.iter() {
1178 *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1179 }
1180 // Recurse on the values in reverse order.
1181 // Note that we do only one pass, i.e., we won't see context through a Get that
1182 // refers to the previous iteration. But this is ok, because we can't reuse
1183 // arrangements across iterations anyway.
1184 for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
1185 this.collect_index_reqs_inner(
1186 value,
1187 &this.context_across_lets[id].clone(),
1188 )?;
1189 }
1190 // Clean up the ids from the saved contexts.
1191 for id in ids {
1192 this.context_across_lets.remove(id);
1193 }
1194 }
1195 _ => {
1196 // Nothing interesting at this node, recurse with the empty context (regardless of
1197 // what context we got from above).
1198 let empty_context = Vec::new();
1199 for child in expr.children_mut() {
1200 this.collect_index_reqs_inner(child, &empty_context)?;
1201 }
1202 }
1203 })
1204 })
1205 }
1206}
1207
1208/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1209/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1210/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1211/// looking for is
1212/// ```text
1213/// <operation that uses an index>
1214/// ArrangeBy <requested_keys>
1215/// Get <global_id>
1216/// ```
1217/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1218/// pattern is present above the `Get`.
1219///
1220/// Note that we usually put this struct in a Vec, because we track context across local let
1221/// bindings, which means that a node can have multiple parents.
1222#[derive(Debug, Clone)]
1223struct IndexUsageContext {
1224 usage_type: IndexUsageType,
1225 requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1226}
1227
1228impl IndexUsageContext {
1229 pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1230 vec![IndexUsageContext {
1231 usage_type,
1232 requested_keys: None,
1233 }]
1234 }
1235
1236 // Add the keys of an ArrangeBy into the contexts.
1237 // Soft_panics if haven't already seen something that indicates what the index will be used for.
1238 pub fn add_keys(
1239 old_contexts: &Vec<IndexUsageContext>,
1240 keys_to_add: &Vec<Vec<MirScalarExpr>>,
1241 ) -> Vec<IndexUsageContext> {
1242 let old_contexts = if old_contexts.is_empty() {
1243 // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1244 soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1245 // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1246 // have a place to note down the requested keys below.
1247 IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1248 } else {
1249 old_contexts.clone()
1250 };
1251 old_contexts
1252 .into_iter()
1253 .flat_map(|old_context| {
1254 if !matches!(
1255 old_context.usage_type,
1256 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1257 ) {
1258 // If it's not an unknown delta join usage, then we simply note down the new
1259 // keys into `requested_keys`.
1260 let mut context = old_context.clone();
1261 if context.requested_keys.is_none() {
1262 context.requested_keys = Some(BTreeSet::new());
1263 }
1264 context
1265 .requested_keys
1266 .as_mut()
1267 .unwrap()
1268 .extend(keys_to_add.iter().cloned());
1269 Some(context).into_iter().chain(None)
1270 } else {
1271 // If it's an unknown delta join usage, then we need to figure out whether this
1272 // is a full scan or a lookup.
1273 //
1274 // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1275 // scan when starting the rendering of a delta path. This is the one for which
1276 // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1277 //
1278 // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1279 // the `source_key` based on the MIR plan. We do this by doing the same as
1280 // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1281 let source_key = keys_to_add
1282 .iter()
1283 .min()
1284 .expect("ArrangeBy below a delta join has at least one key");
1285 let full_scan_context = IndexUsageContext {
1286 requested_keys: Some(BTreeSet::from([source_key.clone()])),
1287 usage_type: IndexUsageType::DeltaJoin(
1288 DeltaJoinIndexUsageType::FirstInputFullScan,
1289 ),
1290 };
1291 let lookup_keys = keys_to_add
1292 .into_iter()
1293 .filter(|key| *key != source_key)
1294 .cloned()
1295 .collect_vec();
1296 if lookup_keys.is_empty() {
1297 Some(full_scan_context).into_iter().chain(None)
1298 } else {
1299 let lookup_context = IndexUsageContext {
1300 requested_keys: Some(lookup_keys.into_iter().collect()),
1301 usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1302 };
1303 Some(full_scan_context)
1304 .into_iter()
1305 .chain(Some(lookup_context))
1306 }
1307 }
1308 })
1309 .collect()
1310 }
1311}
1312
1313/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1314/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1315#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1316pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1317 /// Notices that the optimizer wants to show to users.
1318 /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1319 pub optimizer_notices: Vec<Notice>,
1320 /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1321 /// `prune_and_annotate_dataflow_index_imports`.
1322 pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1323}
1324
1325impl<Notice> Default for DataflowMetainfo<Notice> {
1326 fn default() -> Self {
1327 DataflowMetainfo {
1328 optimizer_notices: Vec::new(),
1329 index_usage_types: BTreeMap::new(),
1330 }
1331 }
1332}
1333
1334impl<Notice> DataflowMetainfo<Notice> {
1335 /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1336 /// `index_ids` iterator against an entry expected to exist in the
1337 /// [`DataflowMetainfo::index_usage_types`].
1338 pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1339 UsedIndexes::new(
1340 df_desc
1341 .index_imports
1342 .iter()
1343 .map(|(id, _)| {
1344 let entry = self.index_usage_types.get(id).cloned();
1345 // If an entry does not exist, mark the usage type for this
1346 // index as `Unknown`.
1347 //
1348 // This should never happen if this method is called after
1349 // running `prune_and_annotate_dataflow_index_imports` on
1350 // the dataflow (this happens at the end of the
1351 // `optimize_dataflow` call).
1352 let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1353
1354 (*id, index_usage_type)
1355 })
1356 .collect(),
1357 )
1358 }
1359}
1360
1361impl DataflowMetainfo<RawOptimizerNotice> {
1362 /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1363 /// only if the exact same notice is not already present.
1364 pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1365 where
1366 T: Into<RawOptimizerNotice>,
1367 {
1368 let notice = notice.into();
1369 if !self.optimizer_notices.contains(¬ice) {
1370 self.optimizer_notices.push(notice);
1371 }
1372 }
1373}