mz_transform/join_implementation.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Clippy's cognitive complexity is easy to reach.
11//#![allow(clippy::cognitive_complexity)]
12
13//! Determines the join implementation for join operators.
14//!
15//! This includes determining the type of join (e.g. differential linear, or delta queries),
16//! determining the orders of collections, lifting predicates if useful arrangements exist,
17//! and identifying opportunities to use indexes to replace filters.
18
19use std::collections::BTreeMap;
20
21use itertools::Itertools;
22use mz_expr::JoinImplementation::{Differential, IndexedFilter, Unimplemented};
23use mz_expr::visit::{Visit, VisitChildren};
24use mz_expr::{
25 FilterCharacteristics, Id, JoinInputCharacteristics, JoinInputMapper, MapFilterProject,
26 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
27};
28use mz_ore::stack::{CheckedRecursion, RecursionGuard};
29use mz_ore::{soft_assert_or_log, soft_panic_or_log};
30use mz_repr::optimize::OptimizerFeatures;
31
32use crate::analysis::{Cardinality, DerivedBuilder};
33use crate::join_implementation::index_map::IndexMap;
34use crate::predicate_pushdown::PredicatePushdown;
35use crate::{StatisticsOracle, TransformCtx, TransformError};
36
37/// Determines the join implementation for join operators.
38#[derive(Debug)]
39pub struct JoinImplementation {
40 recursion_guard: RecursionGuard,
41}
42
43impl Default for JoinImplementation {
44 /// Construct a new [`JoinImplementation`] where `recursion_guard`
45 /// is initialized with [`RECURSION_LIMIT`] as limit.
46 fn default() -> JoinImplementation {
47 JoinImplementation {
48 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
49 }
50 }
51}
52
53impl CheckedRecursion for JoinImplementation {
54 fn recursion_guard(&self) -> &RecursionGuard {
55 &self.recursion_guard
56 }
57}
58
59impl crate::Transform for JoinImplementation {
60 fn name(&self) -> &'static str {
61 "JoinImplementation"
62 }
63
64 #[mz_ore::instrument(
65 target = "optimizer",
66 level = "debug",
67 fields(path.segment = "join_implementation")
68 )]
69 fn actually_perform_transform(
70 &self,
71 relation: &mut MirRelationExpr,
72 ctx: &mut TransformCtx,
73 ) -> Result<(), TransformError> {
74 let result = self.action_recursive(
75 relation,
76 &mut IndexMap::new(ctx.indexes),
77 ctx.stats,
78 ctx.features,
79 );
80 mz_repr::explain::trace_plan(&*relation);
81 result
82 }
83}
84
85impl JoinImplementation {
86 /// Pre-order visitor for each `MirRelationExpr` to find join operators.
87 ///
88 /// This method accumulates state about let-bound arrangements, so that
89 /// join operators can more accurately assess their available arrangements.
90 pub fn action_recursive(
91 &self,
92 relation: &mut MirRelationExpr,
93 indexes: &mut IndexMap,
94 stats: &dyn StatisticsOracle,
95 features: &OptimizerFeatures,
96 ) -> Result<(), TransformError> {
97 self.checked_recur(|_| {
98 if let MirRelationExpr::Let { id, value, body } = relation {
99 self.action_recursive(value, indexes, stats, features)?;
100 match &**value {
101 MirRelationExpr::ArrangeBy { keys, .. } => {
102 for key in keys {
103 indexes.add_local(*id, key.clone());
104 }
105 }
106 MirRelationExpr::Reduce { group_key, .. } => {
107 indexes.add_local(
108 *id,
109 (0..group_key.len()).map(MirScalarExpr::column).collect(),
110 );
111 }
112 _ => {}
113 }
114 self.action_recursive(body, indexes, stats, features)?;
115 indexes.remove_local(*id);
116 Ok(())
117 } else {
118 let (mfp, mfp_input) =
119 MapFilterProject::extract_non_errors_from_expr_ref_mut(relation);
120 mfp_input.try_visit_mut_children(|e| {
121 self.action_recursive(e, indexes, stats, features)
122 })?;
123 self.action(mfp_input, mfp, indexes, stats, features)?;
124 Ok(())
125 }
126 })
127 }
128
129 /// Determines the join implementation for join operators.
130 pub fn action(
131 &self,
132 relation: &mut MirRelationExpr,
133 mfp_above: MapFilterProject,
134 indexes: &IndexMap,
135 stats: &dyn StatisticsOracle,
136 features: &OptimizerFeatures,
137 ) -> Result<(), TransformError> {
138 if let MirRelationExpr::Join {
139 inputs,
140 equivalences,
141 // (Note that `JoinImplementation` runs in a fixpoint loop.)
142 // If the current implementation is
143 // - Unimplemented, then we need to come up with an implementation.
144 // - Differential, then we consider switching to a Delta join, because we might have
145 // inserted some ArrangeBys that create new arrangements when we came up with the
146 // Differential plan, in which case a Delta join might have become viable.
147 // - Delta, then we are good already.
148 // - IndexedFilter, then we just leave that alone, because those are out of scope
149 // for JoinImplementation (they are created by `LiteralConstraints`).
150 // We don't want to change from a Differential plan to an other Differential plan, or
151 // from a Delta plan to an other Delta plan, because the second run cannot distinguish
152 // between an ArrangeBy that marks an already existing arrangement and an ArrangeBy
153 // that was inserted by a previous run of JoinImplementation. (We should eventually
154 // refactor this to make ArrangeBy unambiguous somehow. Maybe move JoinImplementation
155 // to the lowering.)
156 implementation: implementation @ (Unimplemented | Differential(..)),
157 } = relation
158 {
159 // If we eagerly plan delta joins, we don't need the second run to "pick up" delta joins
160 // that could be planned with the arrangements from a differential. If such a delta
161 // join were viable, we'd have already planned it the first time.
162 if features.enable_eager_delta_joins && !matches!(implementation, Unimplemented) {
163 return Ok(());
164 }
165
166 let input_types = inputs.iter().map(|i| i.typ()).collect::<Vec<_>>();
167
168 // Canonicalize the equivalence classes
169 if matches!(implementation, Unimplemented) {
170 // Let's do this only if it's the first run of JoinImplementation, in which case we
171 // are guaranteed to produce a new plan, which will be compatible with the modified
172 // equivalences from the below call. Otherwise, if we already have a Differential or
173 // a Delta join, then we might discard the new plan and go with the old plan, which
174 // was created previously for the old equivalences, and might be invalid for the
175 // modified equivalences from the below call. Note that this issue can arise only if
176 // `canonicalize_equivalences` is not idempotent, which unfortunately seems to be
177 // the case.
178 mz_expr::canonicalize::canonicalize_equivalences(
179 equivalences,
180 input_types.iter().map(|t| &t.column_types),
181 );
182 }
183
184 // Common information of broad utility.
185 let input_mapper = JoinInputMapper::new_from_input_types(&input_types);
186
187 // The first fundamental question is whether we should employ a delta query or not.
188 //
189 // Here we conservatively use the rule that if sufficient arrangements exist we will
190 // use a delta query (except for 2-input joins). (With eager delta joins, we will
191 // settle for fewer arrangements in the delta join than in the differential join.)
192 //
193 // An arrangement is considered available for an input
194 // - if it is a `Get` with columns present in `indexes`,
195 // - or the same wrapped by an IndexedFilter,
196 // - if it is an `ArrangeBy` with the columns present (note that the ArrangeBy might
197 // have been inserted by a previous run of JoinImplementation),
198 // - if it is a `Reduce` whose output is arranged the right way,
199 // - if it is a filter wrapped around either of these (see the mfp extraction).
200 //
201 // The `IndexedFilter` case above is to avoid losing some Delta joins
202 // due to `IndexedFilter` on a join input. This means that in the absolute worst
203 // case (when the `IndexedFilter` doesn't filter out anything), we will fully
204 // re-create some arrangements that we already have for that input. This worst case
205 // is still better than what can happen if we lose a Delta join: Differential joins
206 // will create several new arrangements that doesn't even have a size bound, i.e.,
207 // they might be larger than any user-created index.
208
209 let unique_keys = input_types
210 .into_iter()
211 .map(|typ| typ.keys)
212 .collect::<Vec<_>>();
213 let mut available_arrangements = vec![Vec::new(); inputs.len()];
214 let mut filters = Vec::with_capacity(inputs.len());
215 let mut cardinalities = Vec::with_capacity(inputs.len());
216
217 // We figure out what predicates from mfp_above could be pushed to which input.
218 // We won't actually push these down now; this just informs FilterCharacteristics.
219 let (map, mut filter, _) = mfp_above.as_map_filter_project();
220 let all_errors = filter.iter().all(|p| p.is_literal_err());
221 let (_, pushed_through_map) = PredicatePushdown::push_filters_through_map(
222 &map,
223 &mut filter,
224 mfp_above.input_arity,
225 all_errors,
226 )?;
227 let (_, push_downs) = PredicatePushdown::push_filters_through_join(
228 &input_mapper,
229 equivalences,
230 pushed_through_map,
231 );
232
233 for index in 0..inputs.len() {
234 // We can work around mfps, as we can lift the mfps into the join execution.
235 let (mfp, input) = MapFilterProject::extract_non_errors_from_expr(&inputs[index]);
236 let (_, filter, project) = mfp.as_map_filter_project();
237
238 // We gather filter characteristics:
239 // - From the filter that is directly at the top mfp of the input.
240 // - IndexedFilter joins are constructed from literal equality filters.
241 // - If the input is an ArrangeBy, then we gather filter characteristics from
242 // the mfp below the ArrangeBy. (JoinImplementation often inserts ArrangeBys.)
243 // - From filters that could be pushed down from above the join to this input.
244 // (In LIR, these will be executed right after the join path executes the join
245 // for this input.)
246 // - (No need to look behind Gets, see the inline_mfp argument of RelationCSE.)
247 let mut characteristics = FilterCharacteristics::filter_characteristics(&filter)?;
248 if matches!(
249 input,
250 MirRelationExpr::Join {
251 implementation: IndexedFilter(..),
252 ..
253 }
254 ) {
255 characteristics.add_literal_equality();
256 }
257 if let MirRelationExpr::ArrangeBy {
258 input: arrange_by_input,
259 ..
260 } = input
261 {
262 let (mfp, input) =
263 MapFilterProject::extract_non_errors_from_expr(arrange_by_input);
264 let (_, filter, _) = mfp.as_map_filter_project();
265 characteristics |= FilterCharacteristics::filter_characteristics(&filter)?;
266 if matches!(
267 input,
268 MirRelationExpr::Join {
269 implementation: IndexedFilter(..),
270 ..
271 }
272 ) {
273 characteristics.add_literal_equality();
274 }
275 }
276 let push_down_characteristics =
277 FilterCharacteristics::filter_characteristics(&push_downs[index])?;
278 let push_down_factor = push_down_characteristics.worst_case_scaling_factor();
279 characteristics |= push_down_characteristics;
280
281 // Estimate cardinality
282 if features.enable_cardinality_estimates {
283 let mut builder = DerivedBuilder::new(features);
284 // TODO(mgree): it would be good to not have to copy the statistics here
285 builder.require(Cardinality::with_stats(stats.as_map()));
286 let derived = builder.visit(input);
287
288 let estimate = *derived.as_view().value::<Cardinality>().unwrap();
289 // we've already accounted for the filters _in_ the term; these capture the ones above
290 let scaled = estimate * push_down_factor;
291 cardinalities.push(scaled.rounded());
292 } else {
293 cardinalities.push(None);
294 }
295
296 filters.push(characteristics);
297
298 // Collect available arrangements on this input.
299 match input {
300 MirRelationExpr::Get { id, typ: _, .. } => {
301 available_arrangements[index]
302 .extend(indexes.get(*id).map(|key| key.to_vec()));
303 }
304 MirRelationExpr::ArrangeBy { input, keys } => {
305 // We may use any presented arrangement keys.
306 available_arrangements[index].extend(keys.clone());
307 if let MirRelationExpr::Get { id, typ: _, .. } = &**input {
308 available_arrangements[index]
309 .extend(indexes.get(*id).map(|key| key.to_vec()));
310 }
311 }
312 MirRelationExpr::Reduce { group_key, .. } => {
313 // The first `group_key.len()` columns form an arrangement key.
314 available_arrangements[index]
315 .push((0..group_key.len()).map(MirScalarExpr::column).collect());
316 }
317 MirRelationExpr::Join {
318 implementation: IndexedFilter(id, ..),
319 ..
320 } => {
321 available_arrangements[index]
322 .extend(indexes.get(Id::Global(id.clone())).map(|key| key.to_vec()));
323 }
324 _ => {}
325 }
326 available_arrangements[index].sort();
327 available_arrangements[index].dedup();
328 let reverse_project = project
329 .into_iter()
330 .enumerate()
331 .map(|(i, c)| (c, i))
332 .collect::<BTreeMap<_, _>>();
333 // Eliminate arrangements referring to columns that have been
334 // projected away by surrounding MFPs.
335 available_arrangements[index].retain(|key| {
336 key.iter()
337 .all(|k| k.support().iter().all(|c| reverse_project.contains_key(c)))
338 });
339 // Permute arrangements so columns reference what is after the MFP.
340 for key in available_arrangements[index].iter_mut() {
341 for k in key.iter_mut() {
342 k.permute_map(&reverse_project);
343 }
344 }
345 // Currently we only support using arrangements all of whose
346 // key components can be found in some equivalence.
347 // Note: because `order_input` currently only finds arrangements
348 // with exact key matches, the code below can be removed with no
349 // change in behavior, but this is being kept for a future
350 // TODO: expand `order_input`
351 available_arrangements[index].retain(|key| {
352 key.iter().all(|k| {
353 let k = input_mapper.map_expr_to_global(k.clone(), index);
354 equivalences
355 .iter()
356 .any(|equivalence| equivalence.contains(&k))
357 })
358 });
359 }
360
361 let old_implementation = implementation.clone();
362 let num_inputs = inputs.len();
363 // We've already planned a differential join... should we replace it with a delta join?
364 //
365 // This code path is only active when `eager_delta_joins` is false.
366 if matches!(old_implementation, Differential(..)) {
367 soft_assert_or_log!(
368 !features.enable_eager_delta_joins,
369 "eager delta joins run join implementation just once"
370 );
371
372 // Binary joins can't be delta joins---give up.
373 if inputs.len() <= 2 {
374 return Ok(());
375 }
376
377 // Only plan a delta join if it's no new arrangements (beyond what differential planned).
378 if let Ok((delta_query_plan, 0)) = delta_queries::plan(
379 relation,
380 &input_mapper,
381 &available_arrangements,
382 &unique_keys,
383 &cardinalities,
384 &filters,
385 features,
386 ) {
387 tracing::debug!(plan = ?delta_query_plan, "replacing differential join with delta join");
388 *relation = delta_query_plan;
389 }
390
391 return Ok(());
392 }
393
394 // To have reached here, we must be in our first run of join planning.
395 //
396 // We plan a differential join first.
397 let (differential_query_plan, differential_new_arrangements) = differential::plan(
398 relation,
399 &input_mapper,
400 &available_arrangements,
401 &unique_keys,
402 &cardinalities,
403 &filters,
404 features,
405 )
406 .expect("Failed to produce a differential join plan");
407
408 // Binary joins _must_ be differential. We won't plan a delta join.
409 if num_inputs <= 2 {
410 // if inputs.len() == 0 then something is very wrong.
411 soft_assert_or_log!(num_inputs != 0, "join with no inputs");
412 // if inputs.len() == 1:
413 // Single input joins are filters and should be planned as
414 // differential plans instead of delta queries. Because a
415 // a filter gets converted into a single input join only when
416 // there are existing arrangements, without this early return,
417 // filters will always be planned as delta queries.
418 // Note: This can actually occur, see github-24511.slt.
419 //
420 // if inputs.len() == 2:
421 // We decided to always plan this as a differential join for now, because the usual
422 // advantage of a Delta join avoiding intermediate arrangements doesn't apply.
423 // See more details here:
424 // https://github.com/MaterializeInc/materialize/pull/16099#issuecomment-1316857374
425 // https://github.com/MaterializeInc/materialize/pull/17708#discussion_r1112848747
426 *relation = differential_query_plan;
427
428 return Ok(());
429 }
430
431 // We are planning a multiway join for the first time.
432 //
433 // We compare the delta and differential join plans.
434 //
435 // A delta query requires that, for every path, there is an arrangement for every
436 // input except for the starting one. Such queries are viable when:
437 //
438 // (a) all the arrangements already exist, or
439 // (b) both:
440 // (i) we wouldn't create more arrangements than a differential join would
441 // (ii) `enable_eager_delta_joins` is on
442 //
443 // A differential join of k relations requires k-2 arrangements of intermediate
444 // results (plus k arrangements of the inputs).
445 //
446 // Consider A ⨝ B ⨝ C ⨝ D. If planned as a differential join, we might have:
447 // A » B » C » D
448 // This corresponds to the tree:
449 //
450 // A B
451 // \ /
452 // ⨝ C
453 // \ /
454 // ⨝ D
455 // \ /
456 // ⨝
457 //
458 // At the two internal joins, the differential join will need two new arrangements.
459 //
460 // TODO(mgree): with this refactoring, we should compute `orders` once---both joins
461 // call `optimize_orders` and we can save some work.
462 match delta_queries::plan(
463 relation,
464 &input_mapper,
465 &available_arrangements,
466 &unique_keys,
467 &cardinalities,
468 &filters,
469 features,
470 ) {
471 // If delta plan's inputs need no new arrangements, pick the delta plan.
472 Ok((delta_query_plan, 0)) => {
473 soft_assert_or_log!(
474 matches!(old_implementation, Unimplemented | Differential(..)),
475 "delta query plans should not be planned twice"
476 );
477 tracing::debug!(
478 plan = ?delta_query_plan,
479 differential_new_arrangements = differential_new_arrangements,
480 "picking delta query plan (no new arrangements)");
481 *relation = delta_query_plan;
482 }
483 // If the delta plan needs new arrangements, compare with the differential plan.
484 Ok((delta_query_plan, delta_new_arrangements)) => {
485 tracing::debug!(
486 delta_new_arrangements = delta_new_arrangements,
487 differential_new_arrangements = differential_new_arrangements,
488 "comparing delta and differential joins",
489 );
490
491 if features.enable_eager_delta_joins
492 && delta_new_arrangements <= differential_new_arrangements
493 {
494 // If we're eagerly planning delta joins, pick the delta plan if it's more economical.
495 tracing::debug!(
496 plan = ?delta_query_plan,
497 "picking delta query plan");
498 *relation = delta_query_plan;
499 } else if let Unimplemented = old_implementation {
500 // If we haven't planned the join yet, use the differential plan.
501 tracing::debug!(
502 plan = ?differential_query_plan,
503 "picking differential query plan");
504 *relation = differential_query_plan;
505 } else {
506 // But don't replace an existing differential plan.
507 tracing::debug!(plan = ?old_implementation, "keeping old plan");
508 soft_assert_or_log!(
509 matches!(old_implementation, Differential(..)),
510 "implemented plan in second run of join implementation should be differential \
511 if the delta plan is not viable"
512 )
513 }
514 }
515 // If we can't plan a delta join, plan a differential join.
516 Err(err) => {
517 soft_panic_or_log!("delta planning failed: {err}");
518 tracing::debug!(
519 plan = ?differential_query_plan,
520 "picking differential query plan (delta planning failed)");
521 *relation = differential_query_plan;
522 }
523 }
524 }
525 Ok(())
526 }
527}
528
529mod index_map {
530 use std::collections::BTreeMap;
531
532 use mz_expr::{Id, LocalId, MirScalarExpr};
533
534 use crate::IndexOracle;
535
536 /// Keeps track of local and global indexes available while descending
537 /// a `MirRelationExpr`.
538 #[derive(Debug)]
539 pub struct IndexMap<'a> {
540 local: BTreeMap<LocalId, Vec<Vec<MirScalarExpr>>>,
541 global: &'a dyn IndexOracle,
542 }
543
544 impl IndexMap<'_> {
545 /// Creates a new index map with knowledge of the provided global indexes.
546 pub fn new(global: &dyn IndexOracle) -> IndexMap<'_> {
547 IndexMap {
548 local: BTreeMap::new(),
549 global,
550 }
551 }
552
553 /// Adds a local index on the specified collection with the specified key.
554 pub fn add_local(&mut self, id: LocalId, key: Vec<MirScalarExpr>) {
555 self.local.entry(id).or_default().push(key)
556 }
557
558 /// Removes all local indexes on the specified collection.
559 pub fn remove_local(&mut self, id: LocalId) {
560 self.local.remove(&id);
561 }
562
563 pub fn get(&self, id: Id) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
564 match id {
565 Id::Global(id) => Box::new(self.global.indexes_on(id).map(|(_idx_id, key)| key)),
566 Id::Local(id) => Box::new(
567 self.local
568 .get(&id)
569 .into_iter()
570 .flatten()
571 .map(|x| x.as_slice()),
572 ),
573 }
574 }
575 }
576}
577
578mod delta_queries {
579
580 use std::collections::BTreeSet;
581
582 use mz_expr::{
583 FilterCharacteristics, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
584 };
585 use mz_repr::optimize::OptimizerFeatures;
586
587 use crate::TransformError;
588
589 /// Creates a delta query plan, and any predicates that need to be lifted.
590 /// It also returns the number of new arrangements necessary for this plan.
591 ///
592 /// The method returns `Err` if any errors occur during planning.
593 pub fn plan(
594 join: &MirRelationExpr,
595 input_mapper: &JoinInputMapper,
596 available: &[Vec<Vec<MirScalarExpr>>],
597 unique_keys: &[Vec<Vec<usize>>],
598 cardinalities: &[Option<usize>],
599 filters: &[FilterCharacteristics],
600 optimizer_features: &OptimizerFeatures,
601 ) -> Result<(MirRelationExpr, usize), TransformError> {
602 let mut new_join = join.clone();
603
604 if let MirRelationExpr::Join {
605 inputs,
606 equivalences,
607 implementation,
608 } = &mut new_join
609 {
610 // Determine a viable order for each relation, or return `Err` if none found.
611 let orders = super::optimize_orders(
612 equivalences,
613 available,
614 unique_keys,
615 cardinalities,
616 filters,
617 input_mapper,
618 optimizer_features.enable_join_prioritize_arranged,
619 )?;
620
621 // Count new arrangements.
622 let new_arrangements: usize = orders
623 .iter()
624 .flat_map(|o| {
625 o.iter().skip(1).filter_map(|(c, key, input)| {
626 if c.arranged() {
627 None
628 } else {
629 Some((input, key))
630 }
631 })
632 })
633 .collect::<BTreeSet<_>>()
634 .len();
635
636 // Convert the order information into specific (input, key, characteristics) information.
637 let mut orders = orders
638 .into_iter()
639 .map(|o| {
640 o.into_iter()
641 .skip(1)
642 .map(|(c, key, r)| (r, key, Some(c)))
643 .collect::<Vec<_>>()
644 })
645 .collect::<Vec<_>>();
646
647 // Implement arrangements in each of the inputs.
648 let (lifted_mfp, lifted_projections) =
649 super::implement_arrangements(inputs, available, orders.iter().flatten());
650
651 // Permute `order` to compensate for projections being lifted as part of
652 // the mfp lifting in `implement_arrangements`.
653 orders
654 .iter_mut()
655 .for_each(|order| super::permute_order(order, &lifted_projections));
656
657 *implementation = JoinImplementation::DeltaQuery(orders);
658
659 super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
660
661 // Hooray done!
662 Ok((new_join, new_arrangements))
663 } else {
664 Err(TransformError::Internal(String::from(
665 "delta_queries::plan call on non-join expression",
666 )))
667 }
668 }
669}
670
671mod differential {
672 use std::collections::BTreeSet;
673
674 use mz_expr::{JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr};
675 use mz_ore::soft_assert_eq_or_log;
676 use mz_repr::optimize::OptimizerFeatures;
677
678 use crate::TransformError;
679 use crate::join_implementation::FilterCharacteristics;
680
681 /// Creates a linear differential plan, and any predicates that need to be lifted.
682 /// It also returns the number of new arrangements necessary for this plan.
683 pub fn plan(
684 join: &MirRelationExpr,
685 input_mapper: &JoinInputMapper,
686 available: &[Vec<Vec<MirScalarExpr>>],
687 unique_keys: &[Vec<Vec<usize>>],
688 cardinalities: &[Option<usize>],
689 filters: &[FilterCharacteristics],
690 optimizer_features: &OptimizerFeatures,
691 ) -> Result<(MirRelationExpr, usize), TransformError> {
692 let mut new_join = join.clone();
693
694 if let MirRelationExpr::Join {
695 inputs,
696 equivalences,
697 implementation,
698 } = &mut new_join
699 {
700 // We compute one order for each possible starting point, and we will choose one from
701 // these.
702 //
703 // It is an invariant that the orders are in input order: the ith order begins with the ith input.
704 //
705 // We could change this preference at any point, but the list of orders should still inform.
706 // Important, we should choose something stable under re-ordering, to converge under fixed
707 // point iteration; we choose to start with the first input optimizing our criteria, which
708 // should remain stable even when promoted to the first position.
709 let mut orders = super::optimize_orders(
710 equivalences,
711 available,
712 unique_keys,
713 cardinalities,
714 filters,
715 input_mapper,
716 optimizer_features.enable_join_prioritize_arranged,
717 )?;
718
719 // Count new arrangements.
720 //
721 // We collect the count for each input, to be used to calculate `new_arrangements` below.
722 let new_input_arrangements: Vec<usize> = orders
723 .iter()
724 .map(|o| {
725 o.iter()
726 .filter_map(|(c, key, input)| {
727 if c.arranged() {
728 None
729 } else {
730 Some((*input, key.clone()))
731 }
732 })
733 .collect::<BTreeSet<_>>()
734 .len()
735 })
736 .collect();
737
738 // Inside each order, we take the `FilterCharacteristics` from each element, and OR it
739 // to every other element to the right. This is because we are gonna be looking for the
740 // worst `Characteristic` in every order, and for this it makes sense to include a
741 // filter in a `Characteristic` if the filter was applied not just at that input but
742 // any input before. For examples, see chbench.slt Query 02 and 11.
743 orders.iter_mut().for_each(|order| {
744 let mut sum = FilterCharacteristics::none();
745 for (jic, _, _) in order {
746 *jic.filters() |= sum;
747 sum = jic.filters().clone();
748 }
749 });
750
751 // `orders` has one order for each starting collection, and now we have to choose one
752 // from these. First, we find the worst `Characteristics` inside each order, and then we
753 // find the best one among these across all orders, which goes into
754 // `max_min_characteristics`.
755 let max_min_characteristics = orders
756 .iter()
757 .flat_map(|order| order.iter().map(|(c, _, _)| c.clone()).min())
758 .max();
759 let mut order = if let Some(max_min_characteristics) = max_min_characteristics {
760 orders
761 .into_iter()
762 .filter(|o| {
763 o.iter().map(|(c, _, _)| c).min().unwrap() == &max_min_characteristics
764 })
765 // It can happen that `orders` has multiple such orders that have the same worst
766 // `Characteristic` as `max_min_characteristics`. In this case, we go beyond the
767 // worst `Characteristic`: we inspect the entire `Characteristic` vector of each
768 // of these orders, and choose the best among these. This pushes bad stuff to
769 // happen later, by which time we might have applied some filters.
770 .max_by_key(|o| o.clone())
771 .ok_or_else(|| {
772 TransformError::Internal(String::from(
773 "could not find max-min characteristics",
774 ))
775 })?
776 .into_iter()
777 .map(|(c, key, r)| (r, key, Some(c)))
778 .collect::<Vec<_>>()
779 } else {
780 // if max_min_characteristics is None, then there must only be
781 // one input and thus only one order in orders
782 soft_assert_eq_or_log!(orders.len(), 1);
783 orders
784 .remove(0)
785 .into_iter()
786 .map(|(c, key, r)| (r, key, Some(c)))
787 .collect::<Vec<_>>()
788 };
789
790 let (start, mut start_key, start_characteristics) = order[0].clone();
791
792 // Count new arrangements for this choice of ordering.
793 let new_arrangements = inputs.len().saturating_sub(2) + new_input_arrangements[start];
794
795 // Implement arrangements in each of the inputs.
796 let (lifted_mfp, lifted_projections) =
797 super::implement_arrangements(inputs, available, order.iter());
798
799 // Permute `start_key` and `order` to compensate for projections being lifted as part of
800 // the mfp lifting in `implement_arrangements`.
801 if let Some(proj) = &lifted_projections[start] {
802 start_key.iter_mut().for_each(|k| {
803 k.permute(proj);
804 });
805 }
806 super::permute_order(&mut order, &lifted_projections);
807
808 // now that the starting arrangement has been implemented,
809 // remove it from `order` so `order` only contains information
810 // about the other inputs
811 order.remove(0);
812
813 // Install the implementation.
814 *implementation = JoinImplementation::Differential(
815 (start, Some(start_key), start_characteristics),
816 order,
817 );
818
819 super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
820
821 // Hooray done!
822 Ok((new_join, new_arrangements))
823 } else {
824 Err(TransformError::Internal(String::from(
825 "differential::plan call on non-join expression.",
826 )))
827 }
828 }
829}
830
831/// Modify `inputs` to ensure specified arrangements are available.
832///
833/// Lift filter predicates when all needed arrangements are otherwise available.
834///
835/// Returns
836/// - The lifted mfps combined into one mfp.
837/// - Permutations for each input, which were lifted as part of the mfp lifting. These should be
838/// applied to the join order.
839fn implement_arrangements<'a>(
840 inputs: &mut [MirRelationExpr],
841 available_arrangements: &[Vec<Vec<MirScalarExpr>>],
842 needed_arrangements: impl Iterator<
843 Item = &'a (usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>),
844 >,
845) -> (MapFilterProject, Vec<Option<Vec<usize>>>) {
846 // Collect needed arrangements by source index.
847 let mut needed = vec![Vec::new(); inputs.len()];
848 for (index, key, _characteristics) in needed_arrangements {
849 needed[*index].push(key.clone());
850 }
851
852 let mut lifted_mfps = vec![None; inputs.len()];
853 let mut lifted_projections = vec![None; inputs.len()];
854
855 // Transform inputs[index] based on needed and available arrangements.
856 // Specifically, lift intervening mfps if all arrangements exist.
857 for (index, needed) in needed.iter_mut().enumerate() {
858 needed.sort();
859 needed.dedup();
860 // We should lift any mfps, iff all arrangements are otherwise available.
861 if !needed.is_empty()
862 && needed
863 .iter()
864 .all(|key| available_arrangements[index].contains(key))
865 {
866 lifted_mfps[index] = Some(MapFilterProject::extract_non_errors_from_expr_mut(
867 &mut inputs[index],
868 ));
869 }
870 // Clean up existing arrangements, and install one with the needed keys.
871 while let MirRelationExpr::ArrangeBy { input: inner, .. } = &mut inputs[index] {
872 inputs[index] = inner.take_dangerous();
873 }
874 // If a mfp was lifted in order to install the arrangement, permute the arrangement and
875 // save the lifted projection.
876 if let Some(lifted_mfp) = &lifted_mfps[index] {
877 let (_, _, project) = lifted_mfp.as_map_filter_project();
878 for arrangement_key in needed.iter_mut() {
879 for k in arrangement_key.iter_mut() {
880 k.permute(&project);
881 }
882 }
883 lifted_projections[index] = Some(project);
884 }
885 if !needed.is_empty() {
886 inputs[index] = MirRelationExpr::arrange_by(inputs[index].take_dangerous(), needed);
887 }
888 }
889
890 // Combine lifted mfps into one.
891 let new_join_mapper = JoinInputMapper::new(inputs);
892 let mut arity = new_join_mapper.total_columns();
893 let combined_mfp = MapFilterProject::new(arity);
894 let mut combined_filter = Vec::new();
895 let mut combined_map = Vec::new();
896 let mut combined_project = Vec::new();
897 for (index, lifted_mfp) in lifted_mfps.into_iter().enumerate() {
898 if let Some(mut lifted_mfp) = lifted_mfp {
899 let column_map = new_join_mapper
900 .local_columns(index)
901 .zip_eq(new_join_mapper.global_columns(index))
902 .collect::<BTreeMap<_, _>>();
903 lifted_mfp.permute_fn(
904 // globalize all input column references
905 |c| column_map[&c],
906 // shift the position of scalars to be after the last input
907 // column
908 arity,
909 );
910 let (mut map, mut filter, mut project) = lifted_mfp.as_map_filter_project();
911 arity += map.len();
912 combined_map.append(&mut map);
913 combined_filter.append(&mut filter);
914 combined_project.append(&mut project);
915 } else {
916 combined_project.extend(new_join_mapper.global_columns(index));
917 }
918 }
919
920 (
921 combined_mfp
922 .map(combined_map)
923 .filter(combined_filter)
924 .project(combined_project),
925 lifted_projections,
926 )
927}
928
929/// This function continues the surgery that `implement_arrangements` started.
930///
931/// (In theory, this function could be merged into `implement_arrangements`, but it would be a bit
932/// painful, because we need to access the join's `implementation` after `implement_arrangements`,
933/// which would be somewhat convoluted after what `install_lifted_mfp` does.)
934///
935/// The given MFP should be the merged MFP from all the lifted inputs MFPs.
936///
937/// `install_lifted_mfp` mutates the given join expression as follows:
938/// - Puts the given MFP on top of the Join.
939/// - Attends to `equivalences`, which was invalidated by `implement_arrangements` if it refers to a
940/// column that was permuted or created by the given MFP.
941/// - Canonicalizes scalar expressions in maps and filters with respect to the join equivalences.
942/// See inline comment for more details.
943fn install_lifted_mfp(
944 new_join: &mut MirRelationExpr,
945 mfp: MapFilterProject,
946) -> Result<(), TransformError> {
947 if !mfp.is_identity() {
948 let (mut map, mut filter, project) = mfp.as_map_filter_project();
949 if let MirRelationExpr::Join { equivalences, .. } = new_join {
950 for equivalence in equivalences.iter_mut() {
951 for expr in equivalence.iter_mut() {
952 // permute `equivalences` in light of the project being lifted
953 expr.permute(&project);
954 // if column references refer to mapped expressions that have been
955 // lifted, replace the column reference with the mapped expression.
956 expr.visit_mut_pre(&mut |e| {
957 // This has to be a loop! This is because it can happen that the new
958 // expression is again a column reference, in which case the visitor
959 // wouldn't be called on it again. (The visitation continues with the
960 // children of the new expression, but won't visit the new expression
961 // itself again.)
962 while let MirScalarExpr::Column(c, _) = e {
963 if *c >= mfp.input_arity {
964 *e = map[*c - mfp.input_arity].clone();
965 } else {
966 break;
967 }
968 }
969 })?;
970 }
971 }
972 // Canonicalize scalar expressions in maps and filters with respect to the join
973 // equivalences. This often makes some filters identical, which are then removed.
974 // The identical filters come from either
975 // - lifting several predicates that originally were pushed down by localizing to more
976 // than one inputs;
977 // - individual IS NOT NULL filters on each of the inputs, which become identical
978 // when rewritten using the join equivalences.
979 // (This allows for almost the same optimizations as when `Demand`
980 // used to insert Projections that were marking some columns to be
981 // identical, when Demand used to run after `JoinImplementation`.)
982 let canonicalizer_map = mz_expr::canonicalize::get_canonicalizer_map(equivalences);
983 for expr in map.iter_mut().chain(filter.iter_mut()) {
984 expr.visit_mut_post(&mut |e| {
985 if let Some(canonical_expr) = canonicalizer_map.get(e) {
986 *e = canonical_expr.clone();
987 }
988 })?
989 }
990 }
991 *new_join = new_join.clone().map(map).filter(filter).project(project);
992 }
993 Ok(())
994}
995
996/// Permute the keys in `order` to compensate for projections being lifted from inputs.
997/// `lifted_projections` has an optional projection for each input.
998fn permute_order(
999 order: &mut Vec<(usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>)>,
1000 lifted_projections: &Vec<Option<Vec<usize>>>,
1001) {
1002 order.iter_mut().for_each(|(index, key, _)| {
1003 key.iter_mut().for_each(|kc| {
1004 if let Some(proj) = &lifted_projections[*index] {
1005 kc.permute(proj);
1006 }
1007 })
1008 })
1009}
1010
1011// Computes the best join orders for each input.
1012//
1013// If there are N inputs, returns N orders, with the ith input starting the ith order.
1014fn optimize_orders(
1015 equivalences: &[Vec<MirScalarExpr>], // join equivalences: inside a Vec, the exprs are equivalent
1016 available: &[Vec<Vec<MirScalarExpr>>], // available arrangements per input
1017 unique_keys: &[Vec<Vec<usize>>], // unique keys per input
1018 cardinalities: &[Option<usize>], // cardinalities of input relations
1019 filters: &[FilterCharacteristics], // filter characteristics per input
1020 input_mapper: &JoinInputMapper, // join helper
1021 enable_join_prioritize_arranged: bool,
1022) -> Result<Vec<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>>, TransformError> {
1023 let mut orderer = Orderer::new(
1024 equivalences,
1025 available,
1026 unique_keys,
1027 cardinalities,
1028 filters,
1029 input_mapper,
1030 enable_join_prioritize_arranged,
1031 );
1032 (0..available.len())
1033 .map(move |i| orderer.optimize_order_for(i))
1034 .collect::<Result<Vec<_>, _>>()
1035}
1036
1037struct Orderer<'a> {
1038 inputs: usize,
1039 equivalences: &'a [Vec<MirScalarExpr>],
1040 arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1041 unique_keys: &'a [Vec<Vec<usize>>],
1042 cardinalities: &'a [Option<usize>],
1043 filters: &'a [FilterCharacteristics],
1044 input_mapper: &'a JoinInputMapper,
1045 reverse_equivalences: Vec<Vec<(usize, usize)>>,
1046 unique_arrangement: Vec<Vec<bool>>,
1047
1048 order: Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1049 placed: Vec<bool>,
1050 bound: Vec<Vec<MirScalarExpr>>,
1051 equivalences_active: Vec<bool>,
1052 arrangement_active: Vec<Vec<usize>>,
1053 priority_queue:
1054 std::collections::BinaryHeap<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1055
1056 enable_join_prioritize_arranged: bool,
1057}
1058
1059impl<'a> Orderer<'a> {
1060 fn new(
1061 equivalences: &'a [Vec<MirScalarExpr>],
1062 arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1063 unique_keys: &'a [Vec<Vec<usize>>],
1064 cardinalities: &'a [Option<usize>],
1065 filters: &'a [FilterCharacteristics],
1066 input_mapper: &'a JoinInputMapper,
1067 enable_join_prioritize_arranged: bool,
1068 ) -> Self {
1069 let inputs = arrangements.len();
1070 // A map from inputs to the equivalence classes in which they are referenced.
1071 let mut reverse_equivalences = vec![Vec::new(); inputs];
1072 for (index, equivalence) in equivalences.iter().enumerate() {
1073 for (index2, expr) in equivalence.iter().enumerate() {
1074 for input in input_mapper.lookup_inputs(expr) {
1075 reverse_equivalences[input].push((index, index2));
1076 }
1077 }
1078 }
1079 // Per-arrangement information about uniqueness of the arrangement key.
1080 let mut unique_arrangement = vec![Vec::new(); inputs];
1081 for (input, keys) in arrangements.iter().enumerate() {
1082 for key in keys.iter() {
1083 unique_arrangement[input].push(unique_keys[input].iter().any(|cols| {
1084 cols.iter()
1085 .all(|c| key.contains(&MirScalarExpr::column(*c)))
1086 }));
1087 }
1088 }
1089
1090 let order = Vec::with_capacity(inputs);
1091 let placed = vec![false; inputs];
1092 let bound = vec![Vec::new(); inputs];
1093 let equivalences_active = vec![false; equivalences.len()];
1094 let arrangement_active = vec![Vec::new(); inputs];
1095 let priority_queue = std::collections::BinaryHeap::new();
1096 Self {
1097 inputs,
1098 equivalences,
1099 arrangements,
1100 unique_keys,
1101 cardinalities,
1102 filters,
1103 input_mapper,
1104 reverse_equivalences,
1105 unique_arrangement,
1106 order,
1107 placed,
1108 bound,
1109 equivalences_active,
1110 arrangement_active,
1111 priority_queue,
1112 enable_join_prioritize_arranged,
1113 }
1114 }
1115
1116 fn optimize_order_for(
1117 &mut self,
1118 start: usize,
1119 ) -> Result<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>, TransformError> {
1120 self.order.clear();
1121 self.priority_queue.clear();
1122 for input in 0..self.inputs {
1123 self.placed[input] = false;
1124 self.bound[input].clear();
1125 self.arrangement_active[input].clear();
1126 }
1127 for index in 0..self.equivalences.len() {
1128 self.equivalences_active[index] = false;
1129 }
1130
1131 // Introduce cross joins as a possibility.
1132 for input in 0..self.inputs {
1133 let cardinality = self.cardinalities[input];
1134
1135 let is_unique = self.unique_keys[input].iter().any(|cols| cols.is_empty());
1136 if let Some(pos) = self.arrangements[input]
1137 .iter()
1138 .position(|key| key.is_empty())
1139 {
1140 self.arrangement_active[input].push(pos);
1141 self.priority_queue.push((
1142 JoinInputCharacteristics::new(
1143 is_unique,
1144 0,
1145 true,
1146 cardinality,
1147 self.filters[input].clone(),
1148 input,
1149 self.enable_join_prioritize_arranged,
1150 ),
1151 vec![],
1152 input,
1153 ));
1154 } else {
1155 self.priority_queue.push((
1156 JoinInputCharacteristics::new(
1157 is_unique,
1158 0,
1159 false,
1160 cardinality,
1161 self.filters[input].clone(),
1162 input,
1163 self.enable_join_prioritize_arranged,
1164 ),
1165 vec![],
1166 input,
1167 ));
1168 }
1169 }
1170
1171 // Main loop, ordering all the inputs.
1172 if self.inputs > 1 {
1173 self.order_input(start);
1174 while self.order.len() < self.inputs - 1 {
1175 let (characteristics, key, input) = self.priority_queue.pop().unwrap();
1176 // put the tuple into `self.order` unless the tuple with the same
1177 // input is already in `self.order`. For all inputs other than
1178 // start, `self.placed[input]` is an indication of whether a
1179 // corresponding tuple is already in `self.order`.
1180 if !self.placed[input] {
1181 // non-starting inputs are ordered in decreasing priority
1182 self.order.push((characteristics, key, input));
1183 self.order_input(input);
1184 }
1185 }
1186 }
1187
1188 // `order` now contains all the inputs except the first. Let's create an item for the first
1189 // input. We know which input that is, but we need to compute a key and characteristics.
1190 // We start with some default values:
1191 let mut start_tuple = (
1192 JoinInputCharacteristics::new(
1193 false,
1194 0,
1195 false,
1196 self.cardinalities[start],
1197 self.filters[start].clone(),
1198 start,
1199 self.enable_join_prioritize_arranged,
1200 ),
1201 vec![],
1202 start,
1203 );
1204 // The key should line up with the key of the second input (if there is a second input).
1205 // (At this point, `order[0]` is what will eventually be `order[1]`, i.e., the second input.)
1206 if let Some((_, key, second)) = self.order.get(0) {
1207 // for each component of the key of the second input, try to find the corresponding key
1208 // component in the starting input
1209 let candidate_start_key = key
1210 .iter()
1211 .filter_map(|k| {
1212 let k = self.input_mapper.map_expr_to_global(k.clone(), *second);
1213 self.input_mapper
1214 .find_bound_expr(&k, &[start], self.equivalences)
1215 .map(|bound_key| self.input_mapper.map_expr_to_local(bound_key))
1216 })
1217 .collect::<Vec<_>>();
1218 if candidate_start_key.len() == key.len() {
1219 let cardinality = self.cardinalities[start];
1220 let is_unique = self.unique_keys[start].iter().any(|cols| {
1221 cols.iter()
1222 .all(|c| candidate_start_key.contains(&MirScalarExpr::column(*c)))
1223 });
1224 let arranged = self.arrangements[start]
1225 .iter()
1226 .find(|arrangement_key| arrangement_key == &&candidate_start_key)
1227 .is_some();
1228 start_tuple = (
1229 JoinInputCharacteristics::new(
1230 is_unique,
1231 candidate_start_key.len(),
1232 arranged,
1233 cardinality,
1234 self.filters[start].clone(),
1235 start,
1236 self.enable_join_prioritize_arranged,
1237 ),
1238 candidate_start_key,
1239 start,
1240 );
1241 } else {
1242 // For the second input's key fields, there is nothing else to equate it with but
1243 // the fields of the first input, so we should find a match for each of the fields.
1244 // (For a later input, different fields of a key might be equated with fields coming
1245 // from various inputs.)
1246 // Technically, this happens as follows:
1247 // The second input must have been placed in the `priority_queue` either
1248 // 1) as a cross join possibility, or
1249 // 2) when we called `order_input` on the starting input.
1250 // In the 1) case, `key.len()` is 0. In the 2) case, it was the very first call to
1251 // `order_input`, which means that `placed` was true only for the
1252 // starting input, which means that `fully_supported` was true due to
1253 // one of the expressions referring only to the starting input.
1254 let msg = "Unreachable state in join order optimization".to_string();
1255 return Err(TransformError::Internal(msg));
1256 // (This couldn't be a soft_panic: we would form an arrangement with a wrong key.)
1257 }
1258 }
1259 self.order.insert(0, start_tuple);
1260
1261 Ok(std::mem::replace(&mut self.order, Vec::new()))
1262 }
1263
1264 /// Introduces a specific input and keys to the order, along with its characteristics.
1265 ///
1266 /// This method places a next element in the order, and updates the associated state
1267 /// about other candidates, including which columns are now bound and which potential
1268 /// keys are available to consider (both arranged, and unarranged).
1269 fn order_input(&mut self, input: usize) {
1270 self.placed[input] = true;
1271 for (equivalence, expr_index) in self.reverse_equivalences[input].iter() {
1272 if !self.equivalences_active[*equivalence] {
1273 // Placing `input` *may* activate the equivalence. Each of its columns
1274 // come in to scope, which may result in an expression in `equivalence`
1275 // becoming fully defined (when its support is contained in placed inputs)
1276 let fully_supported = self
1277 .input_mapper
1278 .lookup_inputs(&self.equivalences[*equivalence][*expr_index])
1279 .all(|i| self.placed[i]);
1280 if fully_supported {
1281 self.equivalences_active[*equivalence] = true;
1282 for expr in self.equivalences[*equivalence].iter() {
1283 // find the relations that columns in the expression belong to
1284 let mut rels = self.input_mapper.lookup_inputs(expr);
1285 // Skip the expression if
1286 // * the expression is a literal -> this would translate
1287 // to `rels` being empty
1288 // * the expression has columns belonging to more than
1289 // one relation -> TODO: see how we can plan better in
1290 // this case. Arguably, if this happens, it would
1291 // not be unreasonable to ask the user to write the
1292 // query better.
1293 if let Some(rel) = rels.next() {
1294 if rels.next().is_none() {
1295 let expr = self.input_mapper.map_expr_to_local(expr.clone());
1296
1297 // Update bound columns.
1298 self.bound[rel].push(expr);
1299 self.bound[rel].sort();
1300
1301 // Reconsider all available arrangements.
1302 for (pos, key) in self.arrangements[rel].iter().enumerate() {
1303 if !self.arrangement_active[rel].contains(&pos) {
1304 // TODO: support the restoration of the
1305 // following original lines, which have been
1306 // commented out because Materialize may
1307 // panic otherwise. The original line and comments
1308 // here are:
1309 // Determine if the arrangement is viable, which happens when the
1310 // support of its key is all bound.
1311 // if key.iter().all(|k| k.support().iter().all(|c| self.bound[*rel].contains(&ScalarExpr::Column(*c))) {
1312
1313 // Determine if the arrangement is viable,
1314 // which happens when all its key components are bound.
1315 if key.iter().all(|k| self.bound[rel].contains(k)) {
1316 self.arrangement_active[rel].push(pos);
1317 // TODO: This could be pre-computed, as it is independent of the order.
1318 let is_unique = self.unique_arrangement[rel][pos];
1319 self.priority_queue.push((
1320 JoinInputCharacteristics::new(
1321 is_unique,
1322 key.len(),
1323 true,
1324 self.cardinalities[rel],
1325 self.filters[rel].clone(),
1326 rel,
1327 self.enable_join_prioritize_arranged,
1328 ),
1329 key.clone(),
1330 rel,
1331 ));
1332 }
1333 }
1334 }
1335
1336 // does the relation we're joining on have a unique key wrt what's already bound?
1337 let is_unique = self.unique_keys[rel].iter().any(|cols| {
1338 cols.iter().all(|c| {
1339 self.bound[rel].contains(&MirScalarExpr::column(*c))
1340 })
1341 });
1342 self.priority_queue.push((
1343 JoinInputCharacteristics::new(
1344 is_unique,
1345 self.bound[rel].len(),
1346 false,
1347 self.cardinalities[rel],
1348 self.filters[rel].clone(),
1349 rel,
1350 self.enable_join_prioritize_arranged,
1351 ),
1352 self.bound[rel].clone(),
1353 rel,
1354 ));
1355 }
1356 }
1357 }
1358 }
1359 }
1360 }
1361 }
1362}