mz_transform/join_implementation.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Clippy's cognitive complexity is easy to reach.
11//#![allow(clippy::cognitive_complexity)]
12
13//! Determines the join implementation for join operators.
14//!
15//! This includes determining the type of join (e.g. differential linear, or delta queries),
16//! determining the orders of collections, lifting predicates if useful arrangements exist,
17//! and identifying opportunities to use indexes to replace filters.
18
19use std::collections::BTreeMap;
20
21use mz_expr::JoinImplementation::{Differential, IndexedFilter, Unimplemented};
22use mz_expr::visit::{Visit, VisitChildren};
23use mz_expr::{
24 FilterCharacteristics, Id, JoinInputCharacteristics, JoinInputMapper, MapFilterProject,
25 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
26};
27use mz_ore::stack::{CheckedRecursion, RecursionGuard};
28use mz_ore::{soft_assert_or_log, soft_panic_or_log};
29use mz_repr::optimize::OptimizerFeatures;
30
31use crate::analysis::{Cardinality, DerivedBuilder};
32use crate::join_implementation::index_map::IndexMap;
33use crate::predicate_pushdown::PredicatePushdown;
34use crate::{StatisticsOracle, TransformCtx, TransformError};
35
36/// Determines the join implementation for join operators.
37#[derive(Debug)]
38pub struct JoinImplementation {
39 recursion_guard: RecursionGuard,
40}
41
42impl Default for JoinImplementation {
43 /// Construct a new [`JoinImplementation`] where `recursion_guard`
44 /// is initialized with [`RECURSION_LIMIT`] as limit.
45 fn default() -> JoinImplementation {
46 JoinImplementation {
47 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
48 }
49 }
50}
51
52impl CheckedRecursion for JoinImplementation {
53 fn recursion_guard(&self) -> &RecursionGuard {
54 &self.recursion_guard
55 }
56}
57
58impl crate::Transform for JoinImplementation {
59 fn name(&self) -> &'static str {
60 "JoinImplementation"
61 }
62
63 #[mz_ore::instrument(
64 target = "optimizer",
65 level = "debug",
66 fields(path.segment = "join_implementation")
67 )]
68 fn actually_perform_transform(
69 &self,
70 relation: &mut MirRelationExpr,
71 ctx: &mut TransformCtx,
72 ) -> Result<(), TransformError> {
73 let result = self.action_recursive(
74 relation,
75 &mut IndexMap::new(ctx.indexes),
76 ctx.stats,
77 ctx.features,
78 );
79 mz_repr::explain::trace_plan(&*relation);
80 result
81 }
82}
83
84impl JoinImplementation {
85 /// Pre-order visitor for each `MirRelationExpr` to find join operators.
86 ///
87 /// This method accumulates state about let-bound arrangements, so that
88 /// join operators can more accurately assess their available arrangements.
89 pub fn action_recursive(
90 &self,
91 relation: &mut MirRelationExpr,
92 indexes: &mut IndexMap,
93 stats: &dyn StatisticsOracle,
94 features: &OptimizerFeatures,
95 ) -> Result<(), TransformError> {
96 self.checked_recur(|_| {
97 if let MirRelationExpr::Let { id, value, body } = relation {
98 self.action_recursive(value, indexes, stats, features)?;
99 match &**value {
100 MirRelationExpr::ArrangeBy { keys, .. } => {
101 for key in keys {
102 indexes.add_local(*id, key.clone());
103 }
104 }
105 MirRelationExpr::Reduce { group_key, .. } => {
106 indexes.add_local(
107 *id,
108 (0..group_key.len()).map(MirScalarExpr::Column).collect(),
109 );
110 }
111 _ => {}
112 }
113 self.action_recursive(body, indexes, stats, features)?;
114 indexes.remove_local(*id);
115 Ok(())
116 } else {
117 let (mfp, mfp_input) =
118 MapFilterProject::extract_non_errors_from_expr_ref_mut(relation);
119 mfp_input.try_visit_mut_children(|e| {
120 self.action_recursive(e, indexes, stats, features)
121 })?;
122 self.action(mfp_input, mfp, indexes, stats, features)?;
123 Ok(())
124 }
125 })
126 }
127
128 /// Determines the join implementation for join operators.
129 pub fn action(
130 &self,
131 relation: &mut MirRelationExpr,
132 mfp_above: MapFilterProject,
133 indexes: &IndexMap,
134 stats: &dyn StatisticsOracle,
135 features: &OptimizerFeatures,
136 ) -> Result<(), TransformError> {
137 if let MirRelationExpr::Join {
138 inputs,
139 equivalences,
140 // (Note that `JoinImplementation` runs in a fixpoint loop.)
141 // If the current implementation is
142 // - Unimplemented, then we need to come up with an implementation.
143 // - Differential, then we consider switching to a Delta join, because we might have
144 // inserted some ArrangeBys that create new arrangements when we came up with the
145 // Differential plan, in which case a Delta join might have become viable.
146 // - Delta, then we are good already.
147 // - IndexedFilter, then we just leave that alone, because those are out of scope
148 // for JoinImplementation (they are created by `LiteralConstraints`).
149 // We don't want to change from a Differential plan to an other Differential plan, or
150 // from a Delta plan to an other Delta plan, because the second run cannot distinguish
151 // between an ArrangeBy that marks an already existing arrangement and an ArrangeBy
152 // that was inserted by a previous run of JoinImplementation. (We should eventually
153 // refactor this to make ArrangeBy unambiguous somehow. Maybe move JoinImplementation
154 // to the lowering.)
155 implementation: implementation @ (Unimplemented | Differential(..)),
156 } = relation
157 {
158 // If we eagerly plan delta joins, we don't need the second run to "pick up" delta joins
159 // that could be planned with the arrangements from a differential. If such a delta
160 // join were viable, we'd have already planned it the first time.
161 if features.enable_eager_delta_joins && !matches!(implementation, Unimplemented) {
162 return Ok(());
163 }
164
165 let input_types = inputs.iter().map(|i| i.typ()).collect::<Vec<_>>();
166
167 // Canonicalize the equivalence classes
168 if matches!(implementation, Unimplemented) {
169 // Let's do this only if it's the first run of JoinImplementation, in which case we
170 // are guaranteed to produce a new plan, which will be compatible with the modified
171 // equivalences from the below call. Otherwise, if we already have a Differential or
172 // a Delta join, then we might discard the new plan and go with the old plan, which
173 // was created previously for the old equivalences, and might be invalid for the
174 // modified equivalences from the below call. Note that this issue can arise only if
175 // `canonicalize_equivalences` is not idempotent, which unfortunately seems to be
176 // the case.
177 mz_expr::canonicalize::canonicalize_equivalences(
178 equivalences,
179 input_types.iter().map(|t| &t.column_types),
180 );
181 }
182
183 // Common information of broad utility.
184 let input_mapper = JoinInputMapper::new_from_input_types(&input_types);
185
186 // The first fundamental question is whether we should employ a delta query or not.
187 //
188 // Here we conservatively use the rule that if sufficient arrangements exist we will
189 // use a delta query (except for 2-input joins). (With eager delta joins, we will
190 // settle for fewer arrangements in the delta join than in the differential join.)
191 //
192 // An arrangement is considered available for an input
193 // - if it is a `Get` with columns present in `indexes`,
194 // - or the same wrapped by an IndexedFilter,
195 // - if it is an `ArrangeBy` with the columns present (note that the ArrangeBy might
196 // have been inserted by a previous run of JoinImplementation),
197 // - if it is a `Reduce` whose output is arranged the right way,
198 // - if it is a filter wrapped around either of these (see the mfp extraction).
199 //
200 // The `IndexedFilter` case above is to avoid losing some Delta joins
201 // due to `IndexedFilter` on a join input. This means that in the absolute worst
202 // case (when the `IndexedFilter` doesn't filter out anything), we will fully
203 // re-create some arrangements that we already have for that input. This worst case
204 // is still better than what can happen if we lose a Delta join: Differential joins
205 // will create several new arrangements that doesn't even have a size bound, i.e.,
206 // they might be larger than any user-created index.
207
208 let unique_keys = input_types
209 .into_iter()
210 .map(|typ| typ.keys)
211 .collect::<Vec<_>>();
212 let mut available_arrangements = vec![Vec::new(); inputs.len()];
213 let mut filters = Vec::with_capacity(inputs.len());
214 let mut cardinalities = Vec::with_capacity(inputs.len());
215
216 // We figure out what predicates from mfp_above could be pushed to which input.
217 // We won't actually push these down now; this just informs FilterCharacteristics.
218 let (map, mut filter, _) = mfp_above.as_map_filter_project();
219 let all_errors = filter.iter().all(|p| p.is_literal_err());
220 let (_, pushed_through_map) = PredicatePushdown::push_filters_through_map(
221 &map,
222 &mut filter,
223 mfp_above.input_arity,
224 all_errors,
225 )?;
226 let (_, push_downs) = PredicatePushdown::push_filters_through_join(
227 &input_mapper,
228 equivalences,
229 pushed_through_map,
230 );
231
232 for index in 0..inputs.len() {
233 // We can work around mfps, as we can lift the mfps into the join execution.
234 let (mfp, input) = MapFilterProject::extract_non_errors_from_expr(&inputs[index]);
235 let (_, filter, project) = mfp.as_map_filter_project();
236
237 // We gather filter characteristics:
238 // - From the filter that is directly at the top mfp of the input.
239 // - IndexedFilter joins are constructed from literal equality filters.
240 // - If the input is an ArrangeBy, then we gather filter characteristics from
241 // the mfp below the ArrangeBy. (JoinImplementation often inserts ArrangeBys.)
242 // - From filters that could be pushed down from above the join to this input.
243 // (In LIR, these will be executed right after the join path executes the join
244 // for this input.)
245 // - (No need to look behind Gets, see the inline_mfp argument of RelationCSE.)
246 let mut characteristics = FilterCharacteristics::filter_characteristics(&filter)?;
247 if matches!(
248 input,
249 MirRelationExpr::Join {
250 implementation: IndexedFilter(..),
251 ..
252 }
253 ) {
254 characteristics.add_literal_equality();
255 }
256 if let MirRelationExpr::ArrangeBy {
257 input: arrange_by_input,
258 ..
259 } = input
260 {
261 let (mfp, input) =
262 MapFilterProject::extract_non_errors_from_expr(arrange_by_input);
263 let (_, filter, _) = mfp.as_map_filter_project();
264 characteristics |= FilterCharacteristics::filter_characteristics(&filter)?;
265 if matches!(
266 input,
267 MirRelationExpr::Join {
268 implementation: IndexedFilter(..),
269 ..
270 }
271 ) {
272 characteristics.add_literal_equality();
273 }
274 }
275 let push_down_characteristics =
276 FilterCharacteristics::filter_characteristics(&push_downs[index])?;
277 let push_down_factor = push_down_characteristics.worst_case_scaling_factor();
278 characteristics |= push_down_characteristics;
279
280 // Estimate cardinality
281 if features.enable_cardinality_estimates {
282 let mut builder = DerivedBuilder::new(features);
283 // TODO(mgree): it would be good to not have to copy the statistics here
284 builder.require(Cardinality::with_stats(stats.as_map()));
285 let derived = builder.visit(input);
286
287 let estimate = *derived.as_view().value::<Cardinality>().unwrap();
288 // we've already accounted for the filters _in_ the term; these capture the ones above
289 let scaled = estimate * push_down_factor;
290 cardinalities.push(scaled.rounded());
291 } else {
292 cardinalities.push(None);
293 }
294
295 filters.push(characteristics);
296
297 // Collect available arrangements on this input.
298 match input {
299 MirRelationExpr::Get { id, typ: _, .. } => {
300 available_arrangements[index]
301 .extend(indexes.get(*id).map(|key| key.to_vec()));
302 }
303 MirRelationExpr::ArrangeBy { input, keys } => {
304 // We may use any presented arrangement keys.
305 available_arrangements[index].extend(keys.clone());
306 if let MirRelationExpr::Get { id, typ: _, .. } = &**input {
307 available_arrangements[index]
308 .extend(indexes.get(*id).map(|key| key.to_vec()));
309 }
310 }
311 MirRelationExpr::Reduce { group_key, .. } => {
312 // The first `group_key.len()` columns form an arrangement key.
313 available_arrangements[index]
314 .push((0..group_key.len()).map(MirScalarExpr::Column).collect());
315 }
316 MirRelationExpr::Join {
317 implementation: IndexedFilter(id, ..),
318 ..
319 } => {
320 available_arrangements[index]
321 .extend(indexes.get(Id::Global(id.clone())).map(|key| key.to_vec()));
322 }
323 _ => {}
324 }
325 available_arrangements[index].sort();
326 available_arrangements[index].dedup();
327 let reverse_project = project
328 .into_iter()
329 .enumerate()
330 .map(|(i, c)| (c, i))
331 .collect::<BTreeMap<_, _>>();
332 // Eliminate arrangements referring to columns that have been
333 // projected away by surrounding MFPs.
334 available_arrangements[index].retain(|key| {
335 key.iter()
336 .all(|k| k.support().iter().all(|c| reverse_project.contains_key(c)))
337 });
338 // Permute arrangements so columns reference what is after the MFP.
339 for key in available_arrangements[index].iter_mut() {
340 for k in key.iter_mut() {
341 k.permute_map(&reverse_project);
342 }
343 }
344 // Currently we only support using arrangements all of whose
345 // key components can be found in some equivalence.
346 // Note: because `order_input` currently only finds arrangements
347 // with exact key matches, the code below can be removed with no
348 // change in behavior, but this is being kept for a future
349 // TODO: expand `order_input`
350 available_arrangements[index].retain(|key| {
351 key.iter().all(|k| {
352 let k = input_mapper.map_expr_to_global(k.clone(), index);
353 equivalences
354 .iter()
355 .any(|equivalence| equivalence.contains(&k))
356 })
357 });
358 }
359
360 let old_implementation = implementation.clone();
361 let num_inputs = inputs.len();
362 // We've already planned a differential join... should we replace it with a delta join?
363 //
364 // This code path is only active when `eager_delta_joins` is false.
365 if matches!(old_implementation, Differential(..)) {
366 soft_assert_or_log!(
367 !features.enable_eager_delta_joins,
368 "eager delta joins run join implementation just once"
369 );
370
371 // Binary joins can't be delta joins---give up.
372 if inputs.len() <= 2 {
373 return Ok(());
374 }
375
376 // Only plan a delta join if it's no new arrangements (beyond what differential planned).
377 if let Ok((delta_query_plan, 0)) = delta_queries::plan(
378 relation,
379 &input_mapper,
380 &available_arrangements,
381 &unique_keys,
382 &cardinalities,
383 &filters,
384 features,
385 ) {
386 tracing::debug!(plan = ?delta_query_plan, "replacing differential join with delta join");
387 *relation = delta_query_plan;
388 }
389
390 return Ok(());
391 }
392
393 // To have reached here, we must be in our first run of join planning.
394 //
395 // We plan a differential join first.
396 let (differential_query_plan, differential_new_arrangements) = differential::plan(
397 relation,
398 &input_mapper,
399 &available_arrangements,
400 &unique_keys,
401 &cardinalities,
402 &filters,
403 features,
404 )
405 .expect("Failed to produce a differential join plan");
406
407 // Binary joins _must_ be differential. We won't plan a delta join.
408 if num_inputs <= 2 {
409 // if inputs.len() == 0 then something is very wrong.
410 soft_assert_or_log!(num_inputs != 0, "join with no inputs");
411 // if inputs.len() == 1:
412 // Single input joins are filters and should be planned as
413 // differential plans instead of delta queries. Because a
414 // a filter gets converted into a single input join only when
415 // there are existing arrangements, without this early return,
416 // filters will always be planned as delta queries.
417 // Note: This can actually occur, see github-24511.slt.
418 //
419 // if inputs.len() == 2:
420 // We decided to always plan this as a differential join for now, because the usual
421 // advantage of a Delta join avoiding intermediate arrangements doesn't apply.
422 // See more details here:
423 // https://github.com/MaterializeInc/materialize/pull/16099#issuecomment-1316857374
424 // https://github.com/MaterializeInc/materialize/pull/17708#discussion_r1112848747
425 *relation = differential_query_plan;
426
427 return Ok(());
428 }
429
430 // We are planning a multiway join for the first time.
431 //
432 // We compare the delta and differential join plans.
433 //
434 // A delta query requires that, for every path, there is an arrangement for every
435 // input except for the starting one. Such queries are viable when:
436 //
437 // (a) all the arrangements already exist, or
438 // (b) both:
439 // (i) we wouldn't create more arrangements than a differential join would
440 // (ii) `enable_eager_delta_joins` is on
441 //
442 // A differential join of k relations requires k-2 arrangements of intermediate
443 // results (plus k arrangements of the inputs).
444 //
445 // Consider A ⨝ B ⨝ C ⨝ D. If planned as a differential join, we might have:
446 // A » B » C » D
447 // This corresponds to the tree:
448 //
449 // A B
450 // \ /
451 // ⨝ C
452 // \ /
453 // ⨝ D
454 // \ /
455 // ⨝
456 //
457 // At the two internal joins, the differential join will need two new arrangements.
458 //
459 // TODO(mgree): with this refactoring, we should compute `orders` once---both joins
460 // call `optimize_orders` and we can save some work.
461 match delta_queries::plan(
462 relation,
463 &input_mapper,
464 &available_arrangements,
465 &unique_keys,
466 &cardinalities,
467 &filters,
468 features,
469 ) {
470 // If delta plan's inputs need no new arrangements, pick the delta plan.
471 Ok((delta_query_plan, 0)) => {
472 soft_assert_or_log!(
473 matches!(old_implementation, Unimplemented | Differential(..)),
474 "delta query plans should not be planned twice"
475 );
476 tracing::debug!(
477 plan = ?delta_query_plan,
478 differential_new_arrangements = differential_new_arrangements,
479 "picking delta query plan (no new arrangements)");
480 *relation = delta_query_plan;
481 }
482 // If the delta plan needs new arrangements, compare with the differential plan.
483 Ok((delta_query_plan, delta_new_arrangements)) => {
484 tracing::debug!(
485 delta_new_arrangements = delta_new_arrangements,
486 differential_new_arrangements = differential_new_arrangements,
487 "comparing delta and differential joins",
488 );
489
490 if features.enable_eager_delta_joins
491 && delta_new_arrangements <= differential_new_arrangements
492 {
493 // If we're eagerly planning delta joins, pick the delta plan if it's more economical.
494 tracing::debug!(
495 plan = ?delta_query_plan,
496 "picking delta query plan");
497 *relation = delta_query_plan;
498 } else if let Unimplemented = old_implementation {
499 // If we haven't planned the join yet, use the differential plan.
500 tracing::debug!(
501 plan = ?differential_query_plan,
502 "picking differential query plan");
503 *relation = differential_query_plan;
504 } else {
505 // But don't replace an existing differential plan.
506 tracing::debug!(plan = ?old_implementation, "keeping old plan");
507 soft_assert_or_log!(
508 matches!(old_implementation, Differential(..)),
509 "implemented plan in second run of join implementation should be differential \
510 if the delta plan is not viable"
511 )
512 }
513 }
514 // If we can't plan a delta join, plan a differential join.
515 Err(err) => {
516 soft_panic_or_log!("delta planning failed: {err}");
517 tracing::debug!(
518 plan = ?differential_query_plan,
519 "picking differential query plan (delta planning failed)");
520 *relation = differential_query_plan;
521 }
522 }
523 }
524 Ok(())
525 }
526}
527
528mod index_map {
529 use std::collections::BTreeMap;
530
531 use mz_expr::{Id, LocalId, MirScalarExpr};
532
533 use crate::IndexOracle;
534
535 /// Keeps track of local and global indexes available while descending
536 /// a `MirRelationExpr`.
537 #[derive(Debug)]
538 pub struct IndexMap<'a> {
539 local: BTreeMap<LocalId, Vec<Vec<MirScalarExpr>>>,
540 global: &'a dyn IndexOracle,
541 }
542
543 impl IndexMap<'_> {
544 /// Creates a new index map with knowledge of the provided global indexes.
545 pub fn new(global: &dyn IndexOracle) -> IndexMap {
546 IndexMap {
547 local: BTreeMap::new(),
548 global,
549 }
550 }
551
552 /// Adds a local index on the specified collection with the specified key.
553 pub fn add_local(&mut self, id: LocalId, key: Vec<MirScalarExpr>) {
554 self.local.entry(id).or_default().push(key)
555 }
556
557 /// Removes all local indexes on the specified collection.
558 pub fn remove_local(&mut self, id: LocalId) {
559 self.local.remove(&id);
560 }
561
562 pub fn get(&self, id: Id) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
563 match id {
564 Id::Global(id) => Box::new(self.global.indexes_on(id).map(|(_idx_id, key)| key)),
565 Id::Local(id) => Box::new(
566 self.local
567 .get(&id)
568 .into_iter()
569 .flatten()
570 .map(|x| x.as_slice()),
571 ),
572 }
573 }
574 }
575}
576
577mod delta_queries {
578
579 use std::collections::BTreeSet;
580
581 use mz_expr::{
582 FilterCharacteristics, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
583 };
584 use mz_repr::optimize::OptimizerFeatures;
585
586 use crate::TransformError;
587
588 /// Creates a delta query plan, and any predicates that need to be lifted.
589 /// It also returns the number of new arrangements necessary for this plan.
590 ///
591 /// The method returns `Err` if any errors occur during planning.
592 pub fn plan(
593 join: &MirRelationExpr,
594 input_mapper: &JoinInputMapper,
595 available: &[Vec<Vec<MirScalarExpr>>],
596 unique_keys: &[Vec<Vec<usize>>],
597 cardinalities: &[Option<usize>],
598 filters: &[FilterCharacteristics],
599 optimizer_features: &OptimizerFeatures,
600 ) -> Result<(MirRelationExpr, usize), TransformError> {
601 let mut new_join = join.clone();
602
603 if let MirRelationExpr::Join {
604 inputs,
605 equivalences,
606 implementation,
607 } = &mut new_join
608 {
609 // Determine a viable order for each relation, or return `Err` if none found.
610 let orders = super::optimize_orders(
611 equivalences,
612 available,
613 unique_keys,
614 cardinalities,
615 filters,
616 input_mapper,
617 optimizer_features.enable_join_prioritize_arranged,
618 )?;
619
620 // Count new arrangements.
621 let new_arrangements: usize = orders
622 .iter()
623 .flat_map(|o| {
624 o.iter().skip(1).filter_map(|(c, key, input)| {
625 if c.arranged() {
626 None
627 } else {
628 Some((input, key))
629 }
630 })
631 })
632 .collect::<BTreeSet<_>>()
633 .len();
634
635 // Convert the order information into specific (input, key, characteristics) information.
636 let mut orders = orders
637 .into_iter()
638 .map(|o| {
639 o.into_iter()
640 .skip(1)
641 .map(|(c, key, r)| (r, key, Some(c)))
642 .collect::<Vec<_>>()
643 })
644 .collect::<Vec<_>>();
645
646 // Implement arrangements in each of the inputs.
647 let (lifted_mfp, lifted_projections) =
648 super::implement_arrangements(inputs, available, orders.iter().flatten());
649
650 // Permute `order` to compensate for projections being lifted as part of
651 // the mfp lifting in `implement_arrangements`.
652 orders
653 .iter_mut()
654 .for_each(|order| super::permute_order(order, &lifted_projections));
655
656 *implementation = JoinImplementation::DeltaQuery(orders);
657
658 super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
659
660 // Hooray done!
661 Ok((new_join, new_arrangements))
662 } else {
663 Err(TransformError::Internal(String::from(
664 "delta_queries::plan call on non-join expression",
665 )))
666 }
667 }
668}
669
670mod differential {
671 use std::collections::BTreeSet;
672
673 use mz_expr::{JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr};
674 use mz_ore::soft_assert_eq_or_log;
675 use mz_repr::optimize::OptimizerFeatures;
676
677 use crate::TransformError;
678 use crate::join_implementation::FilterCharacteristics;
679
680 /// Creates a linear differential plan, and any predicates that need to be lifted.
681 /// It also returns the number of new arrangements necessary for this plan.
682 pub fn plan(
683 join: &MirRelationExpr,
684 input_mapper: &JoinInputMapper,
685 available: &[Vec<Vec<MirScalarExpr>>],
686 unique_keys: &[Vec<Vec<usize>>],
687 cardinalities: &[Option<usize>],
688 filters: &[FilterCharacteristics],
689 optimizer_features: &OptimizerFeatures,
690 ) -> Result<(MirRelationExpr, usize), TransformError> {
691 let mut new_join = join.clone();
692
693 if let MirRelationExpr::Join {
694 inputs,
695 equivalences,
696 implementation,
697 } = &mut new_join
698 {
699 // We compute one order for each possible starting point, and we will choose one from
700 // these.
701 //
702 // It is an invariant that the orders are in input order: the ith order begins with the ith input.
703 //
704 // We could change this preference at any point, but the list of orders should still inform.
705 // Important, we should choose something stable under re-ordering, to converge under fixed
706 // point iteration; we choose to start with the first input optimizing our criteria, which
707 // should remain stable even when promoted to the first position.
708 let mut orders = super::optimize_orders(
709 equivalences,
710 available,
711 unique_keys,
712 cardinalities,
713 filters,
714 input_mapper,
715 optimizer_features.enable_join_prioritize_arranged,
716 )?;
717
718 // Count new arrangements.
719 //
720 // We collect the count for each input, to be used to calculate `new_arrangements` below.
721 let new_input_arrangements: Vec<usize> = orders
722 .iter()
723 .map(|o| {
724 o.iter()
725 .filter_map(|(c, key, input)| {
726 if c.arranged() {
727 None
728 } else {
729 Some((*input, key.clone()))
730 }
731 })
732 .collect::<BTreeSet<_>>()
733 .len()
734 })
735 .collect();
736
737 // Inside each order, we take the `FilterCharacteristics` from each element, and OR it
738 // to every other element to the right. This is because we are gonna be looking for the
739 // worst `Characteristic` in every order, and for this it makes sense to include a
740 // filter in a `Characteristic` if the filter was applied not just at that input but
741 // any input before. For examples, see chbench.slt Query 02 and 11.
742 orders.iter_mut().for_each(|order| {
743 let mut sum = FilterCharacteristics::none();
744 for (jic, _, _) in order {
745 *jic.filters() |= sum;
746 sum = jic.filters().clone();
747 }
748 });
749
750 // `orders` has one order for each starting collection, and now we have to choose one
751 // from these. First, we find the worst `Characteristics` inside each order, and then we
752 // find the best one among these across all orders, which goes into
753 // `max_min_characteristics`.
754 let max_min_characteristics = orders
755 .iter()
756 .flat_map(|order| order.iter().map(|(c, _, _)| c.clone()).min())
757 .max();
758 let mut order = if let Some(max_min_characteristics) = max_min_characteristics {
759 orders
760 .into_iter()
761 .filter(|o| {
762 o.iter().map(|(c, _, _)| c).min().unwrap() == &max_min_characteristics
763 })
764 // It can happen that `orders` has multiple such orders that have the same worst
765 // `Characteristic` as `max_min_characteristics`. In this case, we go beyond the
766 // worst `Characteristic`: we inspect the entire `Characteristic` vector of each
767 // of these orders, and choose the best among these. This pushes bad stuff to
768 // happen later, by which time we might have applied some filters.
769 .max_by_key(|o| o.clone())
770 .ok_or_else(|| {
771 TransformError::Internal(String::from(
772 "could not find max-min characteristics",
773 ))
774 })?
775 .into_iter()
776 .map(|(c, key, r)| (r, key, Some(c)))
777 .collect::<Vec<_>>()
778 } else {
779 // if max_min_characteristics is None, then there must only be
780 // one input and thus only one order in orders
781 soft_assert_eq_or_log!(orders.len(), 1);
782 orders
783 .remove(0)
784 .into_iter()
785 .map(|(c, key, r)| (r, key, Some(c)))
786 .collect::<Vec<_>>()
787 };
788
789 let (start, mut start_key, start_characteristics) = order[0].clone();
790
791 // Count new arrangements for this choice of ordering.
792 let new_arrangements = inputs.len().saturating_sub(2) + new_input_arrangements[start];
793
794 // Implement arrangements in each of the inputs.
795 let (lifted_mfp, lifted_projections) =
796 super::implement_arrangements(inputs, available, order.iter());
797
798 // Permute `start_key` and `order` to compensate for projections being lifted as part of
799 // the mfp lifting in `implement_arrangements`.
800 if let Some(proj) = &lifted_projections[start] {
801 start_key.iter_mut().for_each(|k| {
802 k.permute(proj);
803 });
804 }
805 super::permute_order(&mut order, &lifted_projections);
806
807 // now that the starting arrangement has been implemented,
808 // remove it from `order` so `order` only contains information
809 // about the other inputs
810 order.remove(0);
811
812 // Install the implementation.
813 *implementation = JoinImplementation::Differential(
814 (start, Some(start_key), start_characteristics),
815 order,
816 );
817
818 super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
819
820 // Hooray done!
821 Ok((new_join, new_arrangements))
822 } else {
823 Err(TransformError::Internal(String::from(
824 "differential::plan call on non-join expression.",
825 )))
826 }
827 }
828}
829
830/// Modify `inputs` to ensure specified arrangements are available.
831///
832/// Lift filter predicates when all needed arrangements are otherwise available.
833///
834/// Returns
835/// - The lifted mfps combined into one mfp.
836/// - Permutations for each input, which were lifted as part of the mfp lifting. These should be
837/// applied to the join order.
838fn implement_arrangements<'a>(
839 inputs: &mut [MirRelationExpr],
840 available_arrangements: &[Vec<Vec<MirScalarExpr>>],
841 needed_arrangements: impl Iterator<
842 Item = &'a (usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>),
843 >,
844) -> (MapFilterProject, Vec<Option<Vec<usize>>>) {
845 // Collect needed arrangements by source index.
846 let mut needed = vec![Vec::new(); inputs.len()];
847 for (index, key, _characteristics) in needed_arrangements {
848 needed[*index].push(key.clone());
849 }
850
851 let mut lifted_mfps = vec![None; inputs.len()];
852 let mut lifted_projections = vec![None; inputs.len()];
853
854 // Transform inputs[index] based on needed and available arrangements.
855 // Specifically, lift intervening mfps if all arrangements exist.
856 for (index, needed) in needed.iter_mut().enumerate() {
857 needed.sort();
858 needed.dedup();
859 // We should lift any mfps, iff all arrangements are otherwise available.
860 if !needed.is_empty()
861 && needed
862 .iter()
863 .all(|key| available_arrangements[index].contains(key))
864 {
865 lifted_mfps[index] = Some(MapFilterProject::extract_non_errors_from_expr_mut(
866 &mut inputs[index],
867 ));
868 }
869 // Clean up existing arrangements, and install one with the needed keys.
870 while let MirRelationExpr::ArrangeBy { input: inner, .. } = &mut inputs[index] {
871 inputs[index] = inner.take_dangerous();
872 }
873 // If a mfp was lifted in order to install the arrangement, permute the arrangement and
874 // save the lifted projection.
875 if let Some(lifted_mfp) = &lifted_mfps[index] {
876 let (_, _, project) = lifted_mfp.as_map_filter_project();
877 for arrangement_key in needed.iter_mut() {
878 for k in arrangement_key.iter_mut() {
879 k.permute(&project);
880 }
881 }
882 lifted_projections[index] = Some(project);
883 }
884 if !needed.is_empty() {
885 inputs[index] = MirRelationExpr::arrange_by(inputs[index].take_dangerous(), needed);
886 }
887 }
888
889 // Combine lifted mfps into one.
890 let new_join_mapper = JoinInputMapper::new(inputs);
891 let mut arity = new_join_mapper.total_columns();
892 let combined_mfp = MapFilterProject::new(arity);
893 let mut combined_filter = Vec::new();
894 let mut combined_map = Vec::new();
895 let mut combined_project = Vec::new();
896 for (index, lifted_mfp) in lifted_mfps.into_iter().enumerate() {
897 if let Some(mut lifted_mfp) = lifted_mfp {
898 let column_map = new_join_mapper
899 .local_columns(index)
900 .zip(new_join_mapper.global_columns(index))
901 .collect::<BTreeMap<_, _>>();
902 lifted_mfp.permute_fn(
903 // globalize all input column references
904 |c| column_map[&c],
905 // shift the position of scalars to be after the last input
906 // column
907 arity,
908 );
909 let (mut map, mut filter, mut project) = lifted_mfp.as_map_filter_project();
910 arity += map.len();
911 combined_map.append(&mut map);
912 combined_filter.append(&mut filter);
913 combined_project.append(&mut project);
914 } else {
915 combined_project.extend(new_join_mapper.global_columns(index));
916 }
917 }
918
919 (
920 combined_mfp
921 .map(combined_map)
922 .filter(combined_filter)
923 .project(combined_project),
924 lifted_projections,
925 )
926}
927
928/// This function continues the surgery that `implement_arrangements` started.
929///
930/// (In theory, this function could be merged into `implement_arrangements`, but it would be a bit
931/// painful, because we need to access the join's `implementation` after `implement_arrangements`,
932/// which would be somewhat convoluted after what `install_lifted_mfp` does.)
933///
934/// The given MFP should be the merged MFP from all the lifted inputs MFPs.
935///
936/// `install_lifted_mfp` mutates the given join expression as follows:
937/// - Puts the given MFP on top of the Join.
938/// - Attends to `equivalences`, which was invalidated by `implement_arrangements` if it refers to a
939/// column that was permuted or created by the given MFP.
940/// - Canonicalizes scalar expressions in maps and filters with respect to the join equivalences.
941/// See inline comment for more details.
942fn install_lifted_mfp(
943 new_join: &mut MirRelationExpr,
944 mfp: MapFilterProject,
945) -> Result<(), TransformError> {
946 if !mfp.is_identity() {
947 let (mut map, mut filter, project) = mfp.as_map_filter_project();
948 if let MirRelationExpr::Join { equivalences, .. } = new_join {
949 for equivalence in equivalences.iter_mut() {
950 for expr in equivalence.iter_mut() {
951 // permute `equivalences` in light of the project being lifted
952 expr.permute(&project);
953 // if column references refer to mapped expressions that have been
954 // lifted, replace the column reference with the mapped expression.
955 expr.visit_mut_pre(&mut |e| {
956 // This has to be a loop! This is because it can happen that the new
957 // expression is again a column reference, in which case the visitor
958 // wouldn't be called on it again. (The visitation continues with the
959 // children of the new expression, but won't visit the new expression
960 // itself again.)
961 while let MirScalarExpr::Column(c) = e {
962 if *c >= mfp.input_arity {
963 *e = map[*c - mfp.input_arity].clone();
964 } else {
965 break;
966 }
967 }
968 })?;
969 }
970 }
971 // Canonicalize scalar expressions in maps and filters with respect to the join
972 // equivalences. This often makes some filters identical, which are then removed.
973 // The identical filters come from either
974 // - lifting several predicates that originally were pushed down by localizing to more
975 // than one inputs;
976 // - individual IS NOT NULL filters on each of the inputs, which become identical
977 // when rewritten using the join equivalences.
978 // (This allows for almost the same optimizations as when `Demand`
979 // used to insert Projections that were marking some columns to be
980 // identical, when Demand used to run after `JoinImplementation`.)
981 let canonicalizer_map = mz_expr::canonicalize::get_canonicalizer_map(equivalences);
982 for expr in map.iter_mut().chain(filter.iter_mut()) {
983 expr.visit_mut_post(&mut |e| {
984 if let Some(canonical_expr) = canonicalizer_map.get(e) {
985 *e = canonical_expr.clone();
986 }
987 })?
988 }
989 }
990 *new_join = new_join.clone().map(map).filter(filter).project(project);
991 }
992 Ok(())
993}
994
995/// Permute the keys in `order` to compensate for projections being lifted from inputs.
996/// `lifted_projections` has an optional projection for each input.
997fn permute_order(
998 order: &mut Vec<(usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>)>,
999 lifted_projections: &Vec<Option<Vec<usize>>>,
1000) {
1001 order.iter_mut().for_each(|(index, key, _)| {
1002 key.iter_mut().for_each(|kc| {
1003 if let Some(proj) = &lifted_projections[*index] {
1004 kc.permute(proj);
1005 }
1006 })
1007 })
1008}
1009
1010// Computes the best join orders for each input.
1011//
1012// If there are N inputs, returns N orders, with the ith input starting the ith order.
1013fn optimize_orders(
1014 equivalences: &[Vec<MirScalarExpr>], // join equivalences: inside a Vec, the exprs are equivalent
1015 available: &[Vec<Vec<MirScalarExpr>>], // available arrangements per input
1016 unique_keys: &[Vec<Vec<usize>>], // unique keys per input
1017 cardinalities: &[Option<usize>], // cardinalities of input relations
1018 filters: &[FilterCharacteristics], // filter characteristics per input
1019 input_mapper: &JoinInputMapper, // join helper
1020 enable_join_prioritize_arranged: bool,
1021) -> Result<Vec<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>>, TransformError> {
1022 let mut orderer = Orderer::new(
1023 equivalences,
1024 available,
1025 unique_keys,
1026 cardinalities,
1027 filters,
1028 input_mapper,
1029 enable_join_prioritize_arranged,
1030 );
1031 (0..available.len())
1032 .map(move |i| orderer.optimize_order_for(i))
1033 .collect::<Result<Vec<_>, _>>()
1034}
1035
1036struct Orderer<'a> {
1037 inputs: usize,
1038 equivalences: &'a [Vec<MirScalarExpr>],
1039 arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1040 unique_keys: &'a [Vec<Vec<usize>>],
1041 cardinalities: &'a [Option<usize>],
1042 filters: &'a [FilterCharacteristics],
1043 input_mapper: &'a JoinInputMapper,
1044 reverse_equivalences: Vec<Vec<(usize, usize)>>,
1045 unique_arrangement: Vec<Vec<bool>>,
1046
1047 order: Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1048 placed: Vec<bool>,
1049 bound: Vec<Vec<MirScalarExpr>>,
1050 equivalences_active: Vec<bool>,
1051 arrangement_active: Vec<Vec<usize>>,
1052 priority_queue:
1053 std::collections::BinaryHeap<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1054
1055 enable_join_prioritize_arranged: bool,
1056}
1057
1058impl<'a> Orderer<'a> {
1059 fn new(
1060 equivalences: &'a [Vec<MirScalarExpr>],
1061 arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1062 unique_keys: &'a [Vec<Vec<usize>>],
1063 cardinalities: &'a [Option<usize>],
1064 filters: &'a [FilterCharacteristics],
1065 input_mapper: &'a JoinInputMapper,
1066 enable_join_prioritize_arranged: bool,
1067 ) -> Self {
1068 let inputs = arrangements.len();
1069 // A map from inputs to the equivalence classes in which they are referenced.
1070 let mut reverse_equivalences = vec![Vec::new(); inputs];
1071 for (index, equivalence) in equivalences.iter().enumerate() {
1072 for (index2, expr) in equivalence.iter().enumerate() {
1073 for input in input_mapper.lookup_inputs(expr) {
1074 reverse_equivalences[input].push((index, index2));
1075 }
1076 }
1077 }
1078 // Per-arrangement information about uniqueness of the arrangement key.
1079 let mut unique_arrangement = vec![Vec::new(); inputs];
1080 for (input, keys) in arrangements.iter().enumerate() {
1081 for key in keys.iter() {
1082 unique_arrangement[input].push(unique_keys[input].iter().any(|cols| {
1083 cols.iter()
1084 .all(|c| key.contains(&MirScalarExpr::Column(*c)))
1085 }));
1086 }
1087 }
1088
1089 let order = Vec::with_capacity(inputs);
1090 let placed = vec![false; inputs];
1091 let bound = vec![Vec::new(); inputs];
1092 let equivalences_active = vec![false; equivalences.len()];
1093 let arrangement_active = vec![Vec::new(); inputs];
1094 let priority_queue = std::collections::BinaryHeap::new();
1095 Self {
1096 inputs,
1097 equivalences,
1098 arrangements,
1099 unique_keys,
1100 cardinalities,
1101 filters,
1102 input_mapper,
1103 reverse_equivalences,
1104 unique_arrangement,
1105 order,
1106 placed,
1107 bound,
1108 equivalences_active,
1109 arrangement_active,
1110 priority_queue,
1111 enable_join_prioritize_arranged,
1112 }
1113 }
1114
1115 fn optimize_order_for(
1116 &mut self,
1117 start: usize,
1118 ) -> Result<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>, TransformError> {
1119 self.order.clear();
1120 self.priority_queue.clear();
1121 for input in 0..self.inputs {
1122 self.placed[input] = false;
1123 self.bound[input].clear();
1124 self.arrangement_active[input].clear();
1125 }
1126 for index in 0..self.equivalences.len() {
1127 self.equivalences_active[index] = false;
1128 }
1129
1130 // Introduce cross joins as a possibility.
1131 for input in 0..self.inputs {
1132 let cardinality = self.cardinalities[input];
1133
1134 let is_unique = self.unique_keys[input].iter().any(|cols| cols.is_empty());
1135 if let Some(pos) = self.arrangements[input]
1136 .iter()
1137 .position(|key| key.is_empty())
1138 {
1139 self.arrangement_active[input].push(pos);
1140 self.priority_queue.push((
1141 JoinInputCharacteristics::new(
1142 is_unique,
1143 0,
1144 true,
1145 cardinality,
1146 self.filters[input].clone(),
1147 input,
1148 self.enable_join_prioritize_arranged,
1149 ),
1150 vec![],
1151 input,
1152 ));
1153 } else {
1154 self.priority_queue.push((
1155 JoinInputCharacteristics::new(
1156 is_unique,
1157 0,
1158 false,
1159 cardinality,
1160 self.filters[input].clone(),
1161 input,
1162 self.enable_join_prioritize_arranged,
1163 ),
1164 vec![],
1165 input,
1166 ));
1167 }
1168 }
1169
1170 // Main loop, ordering all the inputs.
1171 if self.inputs > 1 {
1172 self.order_input(start);
1173 while self.order.len() < self.inputs - 1 {
1174 let (characteristics, key, input) = self.priority_queue.pop().unwrap();
1175 // put the tuple into `self.order` unless the tuple with the same
1176 // input is already in `self.order`. For all inputs other than
1177 // start, `self.placed[input]` is an indication of whether a
1178 // corresponding tuple is already in `self.order`.
1179 if !self.placed[input] {
1180 // non-starting inputs are ordered in decreasing priority
1181 self.order.push((characteristics, key, input));
1182 self.order_input(input);
1183 }
1184 }
1185 }
1186
1187 // `order` now contains all the inputs except the first. Let's create an item for the first
1188 // input. We know which input that is, but we need to compute a key and characteristics.
1189 // We start with some default values:
1190 let mut start_tuple = (
1191 JoinInputCharacteristics::new(
1192 false,
1193 0,
1194 false,
1195 self.cardinalities[start],
1196 self.filters[start].clone(),
1197 start,
1198 self.enable_join_prioritize_arranged,
1199 ),
1200 vec![],
1201 start,
1202 );
1203 // The key should line up with the key of the second input (if there is a second input).
1204 // (At this point, `order[0]` is what will eventually be `order[1]`, i.e., the second input.)
1205 if let Some((_, key, second)) = self.order.get(0) {
1206 // for each component of the key of the second input, try to find the corresponding key
1207 // component in the starting input
1208 let candidate_start_key = key
1209 .iter()
1210 .filter_map(|k| {
1211 let k = self.input_mapper.map_expr_to_global(k.clone(), *second);
1212 self.input_mapper
1213 .find_bound_expr(&k, &[start], self.equivalences)
1214 .map(|bound_key| self.input_mapper.map_expr_to_local(bound_key))
1215 })
1216 .collect::<Vec<_>>();
1217 if candidate_start_key.len() == key.len() {
1218 let cardinality = self.cardinalities[start];
1219 let is_unique = self.unique_keys[start].iter().any(|cols| {
1220 cols.iter()
1221 .all(|c| candidate_start_key.contains(&MirScalarExpr::Column(*c)))
1222 });
1223 let arranged = self.arrangements[start]
1224 .iter()
1225 .find(|arrangement_key| arrangement_key == &&candidate_start_key)
1226 .is_some();
1227 start_tuple = (
1228 JoinInputCharacteristics::new(
1229 is_unique,
1230 candidate_start_key.len(),
1231 arranged,
1232 cardinality,
1233 self.filters[start].clone(),
1234 start,
1235 self.enable_join_prioritize_arranged,
1236 ),
1237 candidate_start_key,
1238 start,
1239 );
1240 } else {
1241 // For the second input's key fields, there is nothing else to equate it with but
1242 // the fields of the first input, so we should find a match for each of the fields.
1243 // (For a later input, different fields of a key might be equated with fields coming
1244 // from various inputs.)
1245 // Technically, this happens as follows:
1246 // The second input must have been placed in the `priority_queue` either
1247 // 1) as a cross join possibility, or
1248 // 2) when we called `order_input` on the starting input.
1249 // In the 1) case, `key.len()` is 0. In the 2) case, it was the very first call to
1250 // `order_input`, which means that `placed` was true only for the
1251 // starting input, which means that `fully_supported` was true due to
1252 // one of the expressions referring only to the starting input.
1253 let msg = "Unreachable state in join order optimization".to_string();
1254 return Err(TransformError::Internal(msg));
1255 // (This couldn't be a soft_panic: we would form an arrangement with a wrong key.)
1256 }
1257 }
1258 self.order.insert(0, start_tuple);
1259
1260 Ok(std::mem::replace(&mut self.order, Vec::new()))
1261 }
1262
1263 /// Introduces a specific input and keys to the order, along with its characteristics.
1264 ///
1265 /// This method places a next element in the order, and updates the associated state
1266 /// about other candidates, including which columns are now bound and which potential
1267 /// keys are available to consider (both arranged, and unarranged).
1268 fn order_input(&mut self, input: usize) {
1269 self.placed[input] = true;
1270 for (equivalence, expr_index) in self.reverse_equivalences[input].iter() {
1271 if !self.equivalences_active[*equivalence] {
1272 // Placing `input` *may* activate the equivalence. Each of its columns
1273 // come in to scope, which may result in an expression in `equivalence`
1274 // becoming fully defined (when its support is contained in placed inputs)
1275 let fully_supported = self
1276 .input_mapper
1277 .lookup_inputs(&self.equivalences[*equivalence][*expr_index])
1278 .all(|i| self.placed[i]);
1279 if fully_supported {
1280 self.equivalences_active[*equivalence] = true;
1281 for expr in self.equivalences[*equivalence].iter() {
1282 // find the relations that columns in the expression belong to
1283 let mut rels = self.input_mapper.lookup_inputs(expr);
1284 // Skip the expression if
1285 // * the expression is a literal -> this would translate
1286 // to `rels` being empty
1287 // * the expression has columns belonging to more than
1288 // one relation -> TODO: see how we can plan better in
1289 // this case. Arguably, if this happens, it would
1290 // not be unreasonable to ask the user to write the
1291 // query better.
1292 if let Some(rel) = rels.next() {
1293 if rels.next().is_none() {
1294 let expr = self.input_mapper.map_expr_to_local(expr.clone());
1295
1296 // Update bound columns.
1297 self.bound[rel].push(expr);
1298 self.bound[rel].sort();
1299
1300 // Reconsider all available arrangements.
1301 for (pos, key) in self.arrangements[rel].iter().enumerate() {
1302 if !self.arrangement_active[rel].contains(&pos) {
1303 // TODO: support the restoration of the
1304 // following original lines, which have been
1305 // commented out because Materialize may
1306 // panic otherwise. The original line and comments
1307 // here are:
1308 // Determine if the arrangement is viable, which happens when the
1309 // support of its key is all bound.
1310 // if key.iter().all(|k| k.support().iter().all(|c| self.bound[*rel].contains(&ScalarExpr::Column(*c))) {
1311
1312 // Determine if the arrangement is viable,
1313 // which happens when all its key components are bound.
1314 if key.iter().all(|k| self.bound[rel].contains(k)) {
1315 self.arrangement_active[rel].push(pos);
1316 // TODO: This could be pre-computed, as it is independent of the order.
1317 let is_unique = self.unique_arrangement[rel][pos];
1318 self.priority_queue.push((
1319 JoinInputCharacteristics::new(
1320 is_unique,
1321 key.len(),
1322 true,
1323 self.cardinalities[rel],
1324 self.filters[rel].clone(),
1325 rel,
1326 self.enable_join_prioritize_arranged,
1327 ),
1328 key.clone(),
1329 rel,
1330 ));
1331 }
1332 }
1333 }
1334
1335 // does the relation we're joining on have a unique key wrt what's already bound?
1336 let is_unique = self.unique_keys[rel].iter().any(|cols| {
1337 cols.iter().all(|c| {
1338 self.bound[rel].contains(&MirScalarExpr::Column(*c))
1339 })
1340 });
1341 self.priority_queue.push((
1342 JoinInputCharacteristics::new(
1343 is_unique,
1344 self.bound[rel].len(),
1345 false,
1346 self.cardinalities[rel],
1347 self.filters[rel].clone(),
1348 rel,
1349 self.enable_join_prioritize_arranged,
1350 ),
1351 self.bound[rel].clone(),
1352 rel,
1353 ));
1354 }
1355 }
1356 }
1357 }
1358 }
1359 }
1360 }
1361}