mz_transform/
join_implementation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Clippy's cognitive complexity is easy to reach.
11//#![allow(clippy::cognitive_complexity)]
12
13//! Determines the join implementation for join operators.
14//!
15//! This includes determining the type of join (e.g. differential linear, or delta queries),
16//! determining the orders of collections, lifting predicates if useful arrangements exist,
17//! and identifying opportunities to use indexes to replace filters.
18
19use std::collections::BTreeMap;
20
21use mz_expr::JoinImplementation::{Differential, IndexedFilter, Unimplemented};
22use mz_expr::visit::{Visit, VisitChildren};
23use mz_expr::{
24    FilterCharacteristics, Id, JoinInputCharacteristics, JoinInputMapper, MapFilterProject,
25    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
26};
27use mz_ore::stack::{CheckedRecursion, RecursionGuard};
28use mz_ore::{soft_assert_or_log, soft_panic_or_log};
29use mz_repr::optimize::OptimizerFeatures;
30
31use crate::analysis::{Cardinality, DerivedBuilder};
32use crate::join_implementation::index_map::IndexMap;
33use crate::predicate_pushdown::PredicatePushdown;
34use crate::{StatisticsOracle, TransformCtx, TransformError};
35
36/// Determines the join implementation for join operators.
37#[derive(Debug)]
38pub struct JoinImplementation {
39    recursion_guard: RecursionGuard,
40}
41
42impl Default for JoinImplementation {
43    /// Construct a new [`JoinImplementation`] where `recursion_guard`
44    /// is initialized with [`RECURSION_LIMIT`] as limit.
45    fn default() -> JoinImplementation {
46        JoinImplementation {
47            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
48        }
49    }
50}
51
52impl CheckedRecursion for JoinImplementation {
53    fn recursion_guard(&self) -> &RecursionGuard {
54        &self.recursion_guard
55    }
56}
57
58impl crate::Transform for JoinImplementation {
59    fn name(&self) -> &'static str {
60        "JoinImplementation"
61    }
62
63    #[mz_ore::instrument(
64        target = "optimizer",
65        level = "debug",
66        fields(path.segment = "join_implementation")
67    )]
68    fn actually_perform_transform(
69        &self,
70        relation: &mut MirRelationExpr,
71        ctx: &mut TransformCtx,
72    ) -> Result<(), TransformError> {
73        let result = self.action_recursive(
74            relation,
75            &mut IndexMap::new(ctx.indexes),
76            ctx.stats,
77            ctx.features,
78        );
79        mz_repr::explain::trace_plan(&*relation);
80        result
81    }
82}
83
84impl JoinImplementation {
85    /// Pre-order visitor for each `MirRelationExpr` to find join operators.
86    ///
87    /// This method accumulates state about let-bound arrangements, so that
88    /// join operators can more accurately assess their available arrangements.
89    pub fn action_recursive(
90        &self,
91        relation: &mut MirRelationExpr,
92        indexes: &mut IndexMap,
93        stats: &dyn StatisticsOracle,
94        features: &OptimizerFeatures,
95    ) -> Result<(), TransformError> {
96        self.checked_recur(|_| {
97            if let MirRelationExpr::Let { id, value, body } = relation {
98                self.action_recursive(value, indexes, stats, features)?;
99                match &**value {
100                    MirRelationExpr::ArrangeBy { keys, .. } => {
101                        for key in keys {
102                            indexes.add_local(*id, key.clone());
103                        }
104                    }
105                    MirRelationExpr::Reduce { group_key, .. } => {
106                        indexes.add_local(
107                            *id,
108                            (0..group_key.len()).map(MirScalarExpr::Column).collect(),
109                        );
110                    }
111                    _ => {}
112                }
113                self.action_recursive(body, indexes, stats, features)?;
114                indexes.remove_local(*id);
115                Ok(())
116            } else {
117                let (mfp, mfp_input) =
118                    MapFilterProject::extract_non_errors_from_expr_ref_mut(relation);
119                mfp_input.try_visit_mut_children(|e| {
120                    self.action_recursive(e, indexes, stats, features)
121                })?;
122                self.action(mfp_input, mfp, indexes, stats, features)?;
123                Ok(())
124            }
125        })
126    }
127
128    /// Determines the join implementation for join operators.
129    pub fn action(
130        &self,
131        relation: &mut MirRelationExpr,
132        mfp_above: MapFilterProject,
133        indexes: &IndexMap,
134        stats: &dyn StatisticsOracle,
135        features: &OptimizerFeatures,
136    ) -> Result<(), TransformError> {
137        if let MirRelationExpr::Join {
138            inputs,
139            equivalences,
140            // (Note that `JoinImplementation` runs in a fixpoint loop.)
141            // If the current implementation is
142            //  - Unimplemented, then we need to come up with an implementation.
143            //  - Differential, then we consider switching to a Delta join, because we might have
144            //    inserted some ArrangeBys that create new arrangements when we came up with the
145            //    Differential plan, in which case a Delta join might have become viable.
146            //  - Delta, then we are good already.
147            //  - IndexedFilter, then we just leave that alone, because those are out of scope
148            //    for JoinImplementation (they are created by `LiteralConstraints`).
149            // We don't want to change from a Differential plan to an other Differential plan, or
150            // from a Delta plan to an other Delta plan, because the second run cannot distinguish
151            // between an ArrangeBy that marks an already existing arrangement and an ArrangeBy
152            // that was inserted by a previous run of JoinImplementation. (We should eventually
153            // refactor this to make ArrangeBy unambiguous somehow. Maybe move JoinImplementation
154            // to the lowering.)
155            implementation: implementation @ (Unimplemented | Differential(..)),
156        } = relation
157        {
158            // If we eagerly plan delta joins, we don't need the second run to "pick up" delta joins
159            // that could be planned with the arrangements from a differential. If such a delta
160            // join were viable, we'd have already planned it the first time.
161            if features.enable_eager_delta_joins && !matches!(implementation, Unimplemented) {
162                return Ok(());
163            }
164
165            let input_types = inputs.iter().map(|i| i.typ()).collect::<Vec<_>>();
166
167            // Canonicalize the equivalence classes
168            if matches!(implementation, Unimplemented) {
169                // Let's do this only if it's the first run of JoinImplementation, in which case we
170                // are guaranteed to produce a new plan, which will be compatible with the modified
171                // equivalences from the below call. Otherwise, if we already have a Differential or
172                // a Delta join, then we might discard the new plan and go with the old plan, which
173                // was created previously for the old equivalences, and might be invalid for the
174                // modified equivalences from the below call. Note that this issue can arise only if
175                // `canonicalize_equivalences` is not idempotent, which unfortunately seems to be
176                // the case.
177                mz_expr::canonicalize::canonicalize_equivalences(
178                    equivalences,
179                    input_types.iter().map(|t| &t.column_types),
180                );
181            }
182
183            // Common information of broad utility.
184            let input_mapper = JoinInputMapper::new_from_input_types(&input_types);
185
186            // The first fundamental question is whether we should employ a delta query or not.
187            //
188            // Here we conservatively use the rule that if sufficient arrangements exist we will
189            // use a delta query (except for 2-input joins). (With eager delta joins, we will
190            // settle for fewer arrangements in the delta join than in the differential join.)
191            //
192            // An arrangement is considered available for an input
193            // - if it is a `Get` with columns present in `indexes`,
194            //   - or the same wrapped by an IndexedFilter,
195            // - if it is an `ArrangeBy` with the columns present (note that the ArrangeBy might
196            //   have been inserted by a previous run of JoinImplementation),
197            // - if it is a `Reduce` whose output is arranged the right way,
198            // - if it is a filter wrapped around either of these (see the mfp extraction).
199            //
200            // The `IndexedFilter` case above is to avoid losing some Delta joins
201            // due to `IndexedFilter` on a join input. This means that in the absolute worst
202            // case (when the `IndexedFilter` doesn't filter out anything), we will fully
203            // re-create some arrangements that we already have for that input. This worst case
204            // is still better than what can happen if we lose a Delta join: Differential joins
205            // will create several new arrangements that doesn't even have a size bound, i.e.,
206            // they might be larger than any user-created index.
207
208            let unique_keys = input_types
209                .into_iter()
210                .map(|typ| typ.keys)
211                .collect::<Vec<_>>();
212            let mut available_arrangements = vec![Vec::new(); inputs.len()];
213            let mut filters = Vec::with_capacity(inputs.len());
214            let mut cardinalities = Vec::with_capacity(inputs.len());
215
216            // We figure out what predicates from mfp_above could be pushed to which input.
217            // We won't actually push these down now; this just informs FilterCharacteristics.
218            let (map, mut filter, _) = mfp_above.as_map_filter_project();
219            let all_errors = filter.iter().all(|p| p.is_literal_err());
220            let (_, pushed_through_map) = PredicatePushdown::push_filters_through_map(
221                &map,
222                &mut filter,
223                mfp_above.input_arity,
224                all_errors,
225            )?;
226            let (_, push_downs) = PredicatePushdown::push_filters_through_join(
227                &input_mapper,
228                equivalences,
229                pushed_through_map,
230            );
231
232            for index in 0..inputs.len() {
233                // We can work around mfps, as we can lift the mfps into the join execution.
234                let (mfp, input) = MapFilterProject::extract_non_errors_from_expr(&inputs[index]);
235                let (_, filter, project) = mfp.as_map_filter_project();
236
237                // We gather filter characteristics:
238                // - From the filter that is directly at the top mfp of the input.
239                // - IndexedFilter joins are constructed from literal equality filters.
240                // - If the input is an ArrangeBy, then we gather filter characteristics from
241                //   the mfp below the ArrangeBy. (JoinImplementation often inserts ArrangeBys.)
242                // - From filters that could be pushed down from above the join to this input.
243                //   (In LIR, these will be executed right after the join path executes the join
244                //   for this input.)
245                // - (No need to look behind Gets, see the inline_mfp argument of RelationCSE.)
246                let mut characteristics = FilterCharacteristics::filter_characteristics(&filter)?;
247                if matches!(
248                    input,
249                    MirRelationExpr::Join {
250                        implementation: IndexedFilter(..),
251                        ..
252                    }
253                ) {
254                    characteristics.add_literal_equality();
255                }
256                if let MirRelationExpr::ArrangeBy {
257                    input: arrange_by_input,
258                    ..
259                } = input
260                {
261                    let (mfp, input) =
262                        MapFilterProject::extract_non_errors_from_expr(arrange_by_input);
263                    let (_, filter, _) = mfp.as_map_filter_project();
264                    characteristics |= FilterCharacteristics::filter_characteristics(&filter)?;
265                    if matches!(
266                        input,
267                        MirRelationExpr::Join {
268                            implementation: IndexedFilter(..),
269                            ..
270                        }
271                    ) {
272                        characteristics.add_literal_equality();
273                    }
274                }
275                let push_down_characteristics =
276                    FilterCharacteristics::filter_characteristics(&push_downs[index])?;
277                let push_down_factor = push_down_characteristics.worst_case_scaling_factor();
278                characteristics |= push_down_characteristics;
279
280                // Estimate cardinality
281                if features.enable_cardinality_estimates {
282                    let mut builder = DerivedBuilder::new(features);
283                    // TODO(mgree): it would be good to not have to copy the statistics here
284                    builder.require(Cardinality::with_stats(stats.as_map()));
285                    let derived = builder.visit(input);
286
287                    let estimate = *derived.as_view().value::<Cardinality>().unwrap();
288                    // we've already accounted for the filters _in_ the term; these capture the ones above
289                    let scaled = estimate * push_down_factor;
290                    cardinalities.push(scaled.rounded());
291                } else {
292                    cardinalities.push(None);
293                }
294
295                filters.push(characteristics);
296
297                // Collect available arrangements on this input.
298                match input {
299                    MirRelationExpr::Get { id, typ: _, .. } => {
300                        available_arrangements[index]
301                            .extend(indexes.get(*id).map(|key| key.to_vec()));
302                    }
303                    MirRelationExpr::ArrangeBy { input, keys } => {
304                        // We may use any presented arrangement keys.
305                        available_arrangements[index].extend(keys.clone());
306                        if let MirRelationExpr::Get { id, typ: _, .. } = &**input {
307                            available_arrangements[index]
308                                .extend(indexes.get(*id).map(|key| key.to_vec()));
309                        }
310                    }
311                    MirRelationExpr::Reduce { group_key, .. } => {
312                        // The first `group_key.len()` columns form an arrangement key.
313                        available_arrangements[index]
314                            .push((0..group_key.len()).map(MirScalarExpr::Column).collect());
315                    }
316                    MirRelationExpr::Join {
317                        implementation: IndexedFilter(id, ..),
318                        ..
319                    } => {
320                        available_arrangements[index]
321                            .extend(indexes.get(Id::Global(id.clone())).map(|key| key.to_vec()));
322                    }
323                    _ => {}
324                }
325                available_arrangements[index].sort();
326                available_arrangements[index].dedup();
327                let reverse_project = project
328                    .into_iter()
329                    .enumerate()
330                    .map(|(i, c)| (c, i))
331                    .collect::<BTreeMap<_, _>>();
332                // Eliminate arrangements referring to columns that have been
333                // projected away by surrounding MFPs.
334                available_arrangements[index].retain(|key| {
335                    key.iter()
336                        .all(|k| k.support().iter().all(|c| reverse_project.contains_key(c)))
337                });
338                // Permute arrangements so columns reference what is after the MFP.
339                for key in available_arrangements[index].iter_mut() {
340                    for k in key.iter_mut() {
341                        k.permute_map(&reverse_project);
342                    }
343                }
344                // Currently we only support using arrangements all of whose
345                // key components can be found in some equivalence.
346                // Note: because `order_input` currently only finds arrangements
347                // with exact key matches, the code below can be removed with no
348                // change in behavior, but this is being kept for a future
349                // TODO: expand `order_input`
350                available_arrangements[index].retain(|key| {
351                    key.iter().all(|k| {
352                        let k = input_mapper.map_expr_to_global(k.clone(), index);
353                        equivalences
354                            .iter()
355                            .any(|equivalence| equivalence.contains(&k))
356                    })
357                });
358            }
359
360            let old_implementation = implementation.clone();
361            let num_inputs = inputs.len();
362            // We've already planned a differential join... should we replace it with a delta join?
363            //
364            // This code path is only active when `eager_delta_joins` is false.
365            if matches!(old_implementation, Differential(..)) {
366                soft_assert_or_log!(
367                    !features.enable_eager_delta_joins,
368                    "eager delta joins run join implementation just once"
369                );
370
371                // Binary joins can't be delta joins---give up.
372                if inputs.len() <= 2 {
373                    return Ok(());
374                }
375
376                // Only plan a delta join if it's no new arrangements (beyond what differential planned).
377                if let Ok((delta_query_plan, 0)) = delta_queries::plan(
378                    relation,
379                    &input_mapper,
380                    &available_arrangements,
381                    &unique_keys,
382                    &cardinalities,
383                    &filters,
384                    features,
385                ) {
386                    tracing::debug!(plan = ?delta_query_plan, "replacing differential join with delta join");
387                    *relation = delta_query_plan;
388                }
389
390                return Ok(());
391            }
392
393            // To have reached here, we must be in our first run of join planning.
394            //
395            // We plan a differential join first.
396            let (differential_query_plan, differential_new_arrangements) = differential::plan(
397                relation,
398                &input_mapper,
399                &available_arrangements,
400                &unique_keys,
401                &cardinalities,
402                &filters,
403                features,
404            )
405            .expect("Failed to produce a differential join plan");
406
407            // Binary joins _must_ be differential. We won't plan a delta join.
408            if num_inputs <= 2 {
409                // if inputs.len() == 0 then something is very wrong.
410                soft_assert_or_log!(num_inputs != 0, "join with no inputs");
411                // if inputs.len() == 1:
412                // Single input joins are filters and should be planned as
413                // differential plans instead of delta queries. Because a
414                // a filter gets converted into a single input join only when
415                // there are existing arrangements, without this early return,
416                // filters will always be planned as delta queries.
417                // Note: This can actually occur, see github-24511.slt.
418                //
419                // if inputs.len() == 2:
420                // We decided to always plan this as a differential join for now, because the usual
421                // advantage of a Delta join avoiding intermediate arrangements doesn't apply.
422                // See more details here:
423                // https://github.com/MaterializeInc/materialize/pull/16099#issuecomment-1316857374
424                // https://github.com/MaterializeInc/materialize/pull/17708#discussion_r1112848747
425                *relation = differential_query_plan;
426
427                return Ok(());
428            }
429
430            // We are planning a multiway join for the first time.
431            //
432            // We compare the delta and differential join plans.
433            //
434            // A delta query requires that, for every path, there is an arrangement for every
435            // input except for the starting one. Such queries are viable when:
436            //
437            //   (a) all the arrangements already exist, or
438            //   (b) both:
439            //       (i) we wouldn't create more arrangements than a differential join would
440            //       (ii) `enable_eager_delta_joins` is on
441            //
442            // A differential join of k relations requires k-2 arrangements of intermediate
443            // results (plus k arrangements of the inputs).
444            //
445            // Consider A ⨝ B ⨝ C ⨝ D. If planned as a differential join, we might have:
446            //          A » B » C » D
447            // This corresponds to the tree:
448            //
449            // A   B
450            //  \ /
451            //   ⨝   C
452            //    \ /
453            //     ⨝   D
454            //      \ /
455            //       ⨝
456            //
457            // At the two internal joins, the differential join will need two new arrangements.
458            //
459            // TODO(mgree): with this refactoring, we should compute `orders` once---both joins
460            //              call `optimize_orders` and we can save some work.
461            match delta_queries::plan(
462                relation,
463                &input_mapper,
464                &available_arrangements,
465                &unique_keys,
466                &cardinalities,
467                &filters,
468                features,
469            ) {
470                // If delta plan's inputs need no new arrangements, pick the delta plan.
471                Ok((delta_query_plan, 0)) => {
472                    soft_assert_or_log!(
473                        matches!(old_implementation, Unimplemented | Differential(..)),
474                        "delta query plans should not be planned twice"
475                    );
476                    tracing::debug!(
477                        plan = ?delta_query_plan,
478                        differential_new_arrangements = differential_new_arrangements,
479                        "picking delta query plan (no new arrangements)");
480                    *relation = delta_query_plan;
481                }
482                // If the delta plan needs new arrangements, compare with the differential plan.
483                Ok((delta_query_plan, delta_new_arrangements)) => {
484                    tracing::debug!(
485                        delta_new_arrangements = delta_new_arrangements,
486                        differential_new_arrangements = differential_new_arrangements,
487                        "comparing delta and differential joins",
488                    );
489
490                    if features.enable_eager_delta_joins
491                        && delta_new_arrangements <= differential_new_arrangements
492                    {
493                        // If we're eagerly planning delta joins, pick the delta plan if it's more economical.
494                        tracing::debug!(
495                            plan = ?delta_query_plan,
496                            "picking delta query plan");
497                        *relation = delta_query_plan;
498                    } else if let Unimplemented = old_implementation {
499                        // If we haven't planned the join yet, use the differential plan.
500                        tracing::debug!(
501                            plan = ?differential_query_plan,
502                            "picking differential query plan");
503                        *relation = differential_query_plan;
504                    } else {
505                        // But don't replace an existing differential plan.
506                        tracing::debug!(plan = ?old_implementation, "keeping old plan");
507                        soft_assert_or_log!(
508                            matches!(old_implementation, Differential(..)),
509                            "implemented plan in second run of join implementation should be differential \
510                             if the delta plan is not viable"
511                        )
512                    }
513                }
514                // If we can't plan a delta join, plan a differential join.
515                Err(err) => {
516                    soft_panic_or_log!("delta planning failed: {err}");
517                    tracing::debug!(
518                        plan = ?differential_query_plan,
519                        "picking differential query plan (delta planning failed)");
520                    *relation = differential_query_plan;
521                }
522            }
523        }
524        Ok(())
525    }
526}
527
528mod index_map {
529    use std::collections::BTreeMap;
530
531    use mz_expr::{Id, LocalId, MirScalarExpr};
532
533    use crate::IndexOracle;
534
535    /// Keeps track of local and global indexes available while descending
536    /// a `MirRelationExpr`.
537    #[derive(Debug)]
538    pub struct IndexMap<'a> {
539        local: BTreeMap<LocalId, Vec<Vec<MirScalarExpr>>>,
540        global: &'a dyn IndexOracle,
541    }
542
543    impl IndexMap<'_> {
544        /// Creates a new index map with knowledge of the provided global indexes.
545        pub fn new(global: &dyn IndexOracle) -> IndexMap {
546            IndexMap {
547                local: BTreeMap::new(),
548                global,
549            }
550        }
551
552        /// Adds a local index on the specified collection with the specified key.
553        pub fn add_local(&mut self, id: LocalId, key: Vec<MirScalarExpr>) {
554            self.local.entry(id).or_default().push(key)
555        }
556
557        /// Removes all local indexes on the specified collection.
558        pub fn remove_local(&mut self, id: LocalId) {
559            self.local.remove(&id);
560        }
561
562        pub fn get(&self, id: Id) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
563            match id {
564                Id::Global(id) => Box::new(self.global.indexes_on(id).map(|(_idx_id, key)| key)),
565                Id::Local(id) => Box::new(
566                    self.local
567                        .get(&id)
568                        .into_iter()
569                        .flatten()
570                        .map(|x| x.as_slice()),
571                ),
572            }
573        }
574    }
575}
576
577mod delta_queries {
578
579    use std::collections::BTreeSet;
580
581    use mz_expr::{
582        FilterCharacteristics, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
583    };
584    use mz_repr::optimize::OptimizerFeatures;
585
586    use crate::TransformError;
587
588    /// Creates a delta query plan, and any predicates that need to be lifted.
589    /// It also returns the number of new arrangements necessary for this plan.
590    ///
591    /// The method returns `Err` if any errors occur during planning.
592    pub fn plan(
593        join: &MirRelationExpr,
594        input_mapper: &JoinInputMapper,
595        available: &[Vec<Vec<MirScalarExpr>>],
596        unique_keys: &[Vec<Vec<usize>>],
597        cardinalities: &[Option<usize>],
598        filters: &[FilterCharacteristics],
599        optimizer_features: &OptimizerFeatures,
600    ) -> Result<(MirRelationExpr, usize), TransformError> {
601        let mut new_join = join.clone();
602
603        if let MirRelationExpr::Join {
604            inputs,
605            equivalences,
606            implementation,
607        } = &mut new_join
608        {
609            // Determine a viable order for each relation, or return `Err` if none found.
610            let orders = super::optimize_orders(
611                equivalences,
612                available,
613                unique_keys,
614                cardinalities,
615                filters,
616                input_mapper,
617                optimizer_features.enable_join_prioritize_arranged,
618            )?;
619
620            // Count new arrangements.
621            let new_arrangements: usize = orders
622                .iter()
623                .flat_map(|o| {
624                    o.iter().skip(1).filter_map(|(c, key, input)| {
625                        if c.arranged() {
626                            None
627                        } else {
628                            Some((input, key))
629                        }
630                    })
631                })
632                .collect::<BTreeSet<_>>()
633                .len();
634
635            // Convert the order information into specific (input, key, characteristics) information.
636            let mut orders = orders
637                .into_iter()
638                .map(|o| {
639                    o.into_iter()
640                        .skip(1)
641                        .map(|(c, key, r)| (r, key, Some(c)))
642                        .collect::<Vec<_>>()
643                })
644                .collect::<Vec<_>>();
645
646            // Implement arrangements in each of the inputs.
647            let (lifted_mfp, lifted_projections) =
648                super::implement_arrangements(inputs, available, orders.iter().flatten());
649
650            // Permute `order` to compensate for projections being lifted as part of
651            // the mfp lifting in `implement_arrangements`.
652            orders
653                .iter_mut()
654                .for_each(|order| super::permute_order(order, &lifted_projections));
655
656            *implementation = JoinImplementation::DeltaQuery(orders);
657
658            super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
659
660            // Hooray done!
661            Ok((new_join, new_arrangements))
662        } else {
663            Err(TransformError::Internal(String::from(
664                "delta_queries::plan call on non-join expression",
665            )))
666        }
667    }
668}
669
670mod differential {
671    use std::collections::BTreeSet;
672
673    use mz_expr::{JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr};
674    use mz_ore::soft_assert_eq_or_log;
675    use mz_repr::optimize::OptimizerFeatures;
676
677    use crate::TransformError;
678    use crate::join_implementation::FilterCharacteristics;
679
680    /// Creates a linear differential plan, and any predicates that need to be lifted.
681    /// It also returns the number of new arrangements necessary for this plan.
682    pub fn plan(
683        join: &MirRelationExpr,
684        input_mapper: &JoinInputMapper,
685        available: &[Vec<Vec<MirScalarExpr>>],
686        unique_keys: &[Vec<Vec<usize>>],
687        cardinalities: &[Option<usize>],
688        filters: &[FilterCharacteristics],
689        optimizer_features: &OptimizerFeatures,
690    ) -> Result<(MirRelationExpr, usize), TransformError> {
691        let mut new_join = join.clone();
692
693        if let MirRelationExpr::Join {
694            inputs,
695            equivalences,
696            implementation,
697        } = &mut new_join
698        {
699            // We compute one order for each possible starting point, and we will choose one from
700            // these.
701            //
702            // It is an invariant that the orders are in input order: the ith order begins with the ith input.
703            //
704            // We could change this preference at any point, but the list of orders should still inform.
705            // Important, we should choose something stable under re-ordering, to converge under fixed
706            // point iteration; we choose to start with the first input optimizing our criteria, which
707            // should remain stable even when promoted to the first position.
708            let mut orders = super::optimize_orders(
709                equivalences,
710                available,
711                unique_keys,
712                cardinalities,
713                filters,
714                input_mapper,
715                optimizer_features.enable_join_prioritize_arranged,
716            )?;
717
718            // Count new arrangements.
719            //
720            // We collect the count for each input, to be used to calculate `new_arrangements` below.
721            let new_input_arrangements: Vec<usize> = orders
722                .iter()
723                .map(|o| {
724                    o.iter()
725                        .filter_map(|(c, key, input)| {
726                            if c.arranged() {
727                                None
728                            } else {
729                                Some((*input, key.clone()))
730                            }
731                        })
732                        .collect::<BTreeSet<_>>()
733                        .len()
734                })
735                .collect();
736
737            // Inside each order, we take the `FilterCharacteristics` from each element, and OR it
738            // to every other element to the right. This is because we are gonna be looking for the
739            // worst `Characteristic` in every order, and for this it makes sense to include a
740            // filter in a `Characteristic` if the filter was applied not just at that input but
741            // any input before. For examples, see chbench.slt Query 02 and 11.
742            orders.iter_mut().for_each(|order| {
743                let mut sum = FilterCharacteristics::none();
744                for (jic, _, _) in order {
745                    *jic.filters() |= sum;
746                    sum = jic.filters().clone();
747                }
748            });
749
750            // `orders` has one order for each starting collection, and now we have to choose one
751            // from these. First, we find the worst `Characteristics` inside each order, and then we
752            // find the best one among these across all orders, which goes into
753            // `max_min_characteristics`.
754            let max_min_characteristics = orders
755                .iter()
756                .flat_map(|order| order.iter().map(|(c, _, _)| c.clone()).min())
757                .max();
758            let mut order = if let Some(max_min_characteristics) = max_min_characteristics {
759                orders
760                    .into_iter()
761                    .filter(|o| {
762                        o.iter().map(|(c, _, _)| c).min().unwrap() == &max_min_characteristics
763                    })
764                    // It can happen that `orders` has multiple such orders that have the same worst
765                    // `Characteristic` as `max_min_characteristics`. In this case, we go beyond the
766                    // worst `Characteristic`: we inspect the entire `Characteristic` vector of each
767                    // of these orders, and choose the best among these. This pushes bad stuff to
768                    // happen later, by which time we might have applied some filters.
769                    .max_by_key(|o| o.clone())
770                    .ok_or_else(|| {
771                        TransformError::Internal(String::from(
772                            "could not find max-min characteristics",
773                        ))
774                    })?
775                    .into_iter()
776                    .map(|(c, key, r)| (r, key, Some(c)))
777                    .collect::<Vec<_>>()
778            } else {
779                // if max_min_characteristics is None, then there must only be
780                // one input and thus only one order in orders
781                soft_assert_eq_or_log!(orders.len(), 1);
782                orders
783                    .remove(0)
784                    .into_iter()
785                    .map(|(c, key, r)| (r, key, Some(c)))
786                    .collect::<Vec<_>>()
787            };
788
789            let (start, mut start_key, start_characteristics) = order[0].clone();
790
791            // Count new arrangements for this choice of ordering.
792            let new_arrangements = inputs.len().saturating_sub(2) + new_input_arrangements[start];
793
794            // Implement arrangements in each of the inputs.
795            let (lifted_mfp, lifted_projections) =
796                super::implement_arrangements(inputs, available, order.iter());
797
798            // Permute `start_key` and `order` to compensate for projections being lifted as part of
799            // the mfp lifting in `implement_arrangements`.
800            if let Some(proj) = &lifted_projections[start] {
801                start_key.iter_mut().for_each(|k| {
802                    k.permute(proj);
803                });
804            }
805            super::permute_order(&mut order, &lifted_projections);
806
807            // now that the starting arrangement has been implemented,
808            // remove it from `order` so `order` only contains information
809            // about the other inputs
810            order.remove(0);
811
812            // Install the implementation.
813            *implementation = JoinImplementation::Differential(
814                (start, Some(start_key), start_characteristics),
815                order,
816            );
817
818            super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
819
820            // Hooray done!
821            Ok((new_join, new_arrangements))
822        } else {
823            Err(TransformError::Internal(String::from(
824                "differential::plan call on non-join expression.",
825            )))
826        }
827    }
828}
829
830/// Modify `inputs` to ensure specified arrangements are available.
831///
832/// Lift filter predicates when all needed arrangements are otherwise available.
833///
834/// Returns
835///  - The lifted mfps combined into one mfp.
836///  - Permutations for each input, which were lifted as part of the mfp lifting. These should be
837///    applied to the join order.
838fn implement_arrangements<'a>(
839    inputs: &mut [MirRelationExpr],
840    available_arrangements: &[Vec<Vec<MirScalarExpr>>],
841    needed_arrangements: impl Iterator<
842        Item = &'a (usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>),
843    >,
844) -> (MapFilterProject, Vec<Option<Vec<usize>>>) {
845    // Collect needed arrangements by source index.
846    let mut needed = vec![Vec::new(); inputs.len()];
847    for (index, key, _characteristics) in needed_arrangements {
848        needed[*index].push(key.clone());
849    }
850
851    let mut lifted_mfps = vec![None; inputs.len()];
852    let mut lifted_projections = vec![None; inputs.len()];
853
854    // Transform inputs[index] based on needed and available arrangements.
855    // Specifically, lift intervening mfps if all arrangements exist.
856    for (index, needed) in needed.iter_mut().enumerate() {
857        needed.sort();
858        needed.dedup();
859        // We should lift any mfps, iff all arrangements are otherwise available.
860        if !needed.is_empty()
861            && needed
862                .iter()
863                .all(|key| available_arrangements[index].contains(key))
864        {
865            lifted_mfps[index] = Some(MapFilterProject::extract_non_errors_from_expr_mut(
866                &mut inputs[index],
867            ));
868        }
869        // Clean up existing arrangements, and install one with the needed keys.
870        while let MirRelationExpr::ArrangeBy { input: inner, .. } = &mut inputs[index] {
871            inputs[index] = inner.take_dangerous();
872        }
873        // If a mfp was lifted in order to install the arrangement, permute the arrangement and
874        // save the lifted projection.
875        if let Some(lifted_mfp) = &lifted_mfps[index] {
876            let (_, _, project) = lifted_mfp.as_map_filter_project();
877            for arrangement_key in needed.iter_mut() {
878                for k in arrangement_key.iter_mut() {
879                    k.permute(&project);
880                }
881            }
882            lifted_projections[index] = Some(project);
883        }
884        if !needed.is_empty() {
885            inputs[index] = MirRelationExpr::arrange_by(inputs[index].take_dangerous(), needed);
886        }
887    }
888
889    // Combine lifted mfps into one.
890    let new_join_mapper = JoinInputMapper::new(inputs);
891    let mut arity = new_join_mapper.total_columns();
892    let combined_mfp = MapFilterProject::new(arity);
893    let mut combined_filter = Vec::new();
894    let mut combined_map = Vec::new();
895    let mut combined_project = Vec::new();
896    for (index, lifted_mfp) in lifted_mfps.into_iter().enumerate() {
897        if let Some(mut lifted_mfp) = lifted_mfp {
898            let column_map = new_join_mapper
899                .local_columns(index)
900                .zip(new_join_mapper.global_columns(index))
901                .collect::<BTreeMap<_, _>>();
902            lifted_mfp.permute_fn(
903                // globalize all input column references
904                |c| column_map[&c],
905                // shift the position of scalars to be after the last input
906                // column
907                arity,
908            );
909            let (mut map, mut filter, mut project) = lifted_mfp.as_map_filter_project();
910            arity += map.len();
911            combined_map.append(&mut map);
912            combined_filter.append(&mut filter);
913            combined_project.append(&mut project);
914        } else {
915            combined_project.extend(new_join_mapper.global_columns(index));
916        }
917    }
918
919    (
920        combined_mfp
921            .map(combined_map)
922            .filter(combined_filter)
923            .project(combined_project),
924        lifted_projections,
925    )
926}
927
928/// This function continues the surgery that `implement_arrangements` started.
929///
930/// (In theory, this function could be merged into `implement_arrangements`, but it would be a bit
931/// painful, because we need to access the join's `implementation` after `implement_arrangements`,
932/// which would be somewhat convoluted after what `install_lifted_mfp` does.)
933///
934/// The given MFP should be the merged MFP from all the lifted inputs MFPs.
935///
936/// `install_lifted_mfp` mutates the given join expression as follows:
937/// - Puts the given MFP on top of the Join.
938/// - Attends to `equivalences`, which was invalidated by `implement_arrangements` if it refers to a
939///   column that was permuted or created by the given MFP.
940/// - Canonicalizes scalar expressions in maps and filters with respect to the join equivalences.
941///   See inline comment for more details.
942fn install_lifted_mfp(
943    new_join: &mut MirRelationExpr,
944    mfp: MapFilterProject,
945) -> Result<(), TransformError> {
946    if !mfp.is_identity() {
947        let (mut map, mut filter, project) = mfp.as_map_filter_project();
948        if let MirRelationExpr::Join { equivalences, .. } = new_join {
949            for equivalence in equivalences.iter_mut() {
950                for expr in equivalence.iter_mut() {
951                    // permute `equivalences` in light of the project being lifted
952                    expr.permute(&project);
953                    // if column references refer to mapped expressions that have been
954                    // lifted, replace the column reference with the mapped expression.
955                    expr.visit_mut_pre(&mut |e| {
956                        // This has to be a loop! This is because it can happen that the new
957                        // expression is again a column reference, in which case the visitor
958                        // wouldn't be called on it again. (The visitation continues with the
959                        // children of the new expression, but won't visit the new expression
960                        // itself again.)
961                        while let MirScalarExpr::Column(c) = e {
962                            if *c >= mfp.input_arity {
963                                *e = map[*c - mfp.input_arity].clone();
964                            } else {
965                                break;
966                            }
967                        }
968                    })?;
969                }
970            }
971            // Canonicalize scalar expressions in maps and filters with respect to the join
972            // equivalences. This often makes some filters identical, which are then removed.
973            // The identical filters come from either
974            //  - lifting several predicates that originally were pushed down by localizing to more
975            //    than one inputs;
976            //  - individual IS NOT NULL filters on each of the inputs, which become identical
977            //    when rewritten using the join equivalences.
978            //  (This allows for almost the same optimizations as when `Demand`
979            //  used to insert Projections that were marking some columns to be
980            //  identical, when Demand used to run after `JoinImplementation`.)
981            let canonicalizer_map = mz_expr::canonicalize::get_canonicalizer_map(equivalences);
982            for expr in map.iter_mut().chain(filter.iter_mut()) {
983                expr.visit_mut_post(&mut |e| {
984                    if let Some(canonical_expr) = canonicalizer_map.get(e) {
985                        *e = canonical_expr.clone();
986                    }
987                })?
988            }
989        }
990        *new_join = new_join.clone().map(map).filter(filter).project(project);
991    }
992    Ok(())
993}
994
995/// Permute the keys in `order` to compensate for projections being lifted from inputs.
996/// `lifted_projections` has an optional projection for each input.
997fn permute_order(
998    order: &mut Vec<(usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>)>,
999    lifted_projections: &Vec<Option<Vec<usize>>>,
1000) {
1001    order.iter_mut().for_each(|(index, key, _)| {
1002        key.iter_mut().for_each(|kc| {
1003            if let Some(proj) = &lifted_projections[*index] {
1004                kc.permute(proj);
1005            }
1006        })
1007    })
1008}
1009
1010// Computes the best join orders for each input.
1011//
1012// If there are N inputs, returns N orders, with the ith input starting the ith order.
1013fn optimize_orders(
1014    equivalences: &[Vec<MirScalarExpr>], // join equivalences: inside a Vec, the exprs are equivalent
1015    available: &[Vec<Vec<MirScalarExpr>>], // available arrangements per input
1016    unique_keys: &[Vec<Vec<usize>>],     // unique keys per input
1017    cardinalities: &[Option<usize>],     // cardinalities of input relations
1018    filters: &[FilterCharacteristics],   // filter characteristics per input
1019    input_mapper: &JoinInputMapper,      // join helper
1020    enable_join_prioritize_arranged: bool,
1021) -> Result<Vec<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>>, TransformError> {
1022    let mut orderer = Orderer::new(
1023        equivalences,
1024        available,
1025        unique_keys,
1026        cardinalities,
1027        filters,
1028        input_mapper,
1029        enable_join_prioritize_arranged,
1030    );
1031    (0..available.len())
1032        .map(move |i| orderer.optimize_order_for(i))
1033        .collect::<Result<Vec<_>, _>>()
1034}
1035
1036struct Orderer<'a> {
1037    inputs: usize,
1038    equivalences: &'a [Vec<MirScalarExpr>],
1039    arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1040    unique_keys: &'a [Vec<Vec<usize>>],
1041    cardinalities: &'a [Option<usize>],
1042    filters: &'a [FilterCharacteristics],
1043    input_mapper: &'a JoinInputMapper,
1044    reverse_equivalences: Vec<Vec<(usize, usize)>>,
1045    unique_arrangement: Vec<Vec<bool>>,
1046
1047    order: Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1048    placed: Vec<bool>,
1049    bound: Vec<Vec<MirScalarExpr>>,
1050    equivalences_active: Vec<bool>,
1051    arrangement_active: Vec<Vec<usize>>,
1052    priority_queue:
1053        std::collections::BinaryHeap<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1054
1055    enable_join_prioritize_arranged: bool,
1056}
1057
1058impl<'a> Orderer<'a> {
1059    fn new(
1060        equivalences: &'a [Vec<MirScalarExpr>],
1061        arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1062        unique_keys: &'a [Vec<Vec<usize>>],
1063        cardinalities: &'a [Option<usize>],
1064        filters: &'a [FilterCharacteristics],
1065        input_mapper: &'a JoinInputMapper,
1066        enable_join_prioritize_arranged: bool,
1067    ) -> Self {
1068        let inputs = arrangements.len();
1069        // A map from inputs to the equivalence classes in which they are referenced.
1070        let mut reverse_equivalences = vec![Vec::new(); inputs];
1071        for (index, equivalence) in equivalences.iter().enumerate() {
1072            for (index2, expr) in equivalence.iter().enumerate() {
1073                for input in input_mapper.lookup_inputs(expr) {
1074                    reverse_equivalences[input].push((index, index2));
1075                }
1076            }
1077        }
1078        // Per-arrangement information about uniqueness of the arrangement key.
1079        let mut unique_arrangement = vec![Vec::new(); inputs];
1080        for (input, keys) in arrangements.iter().enumerate() {
1081            for key in keys.iter() {
1082                unique_arrangement[input].push(unique_keys[input].iter().any(|cols| {
1083                    cols.iter()
1084                        .all(|c| key.contains(&MirScalarExpr::Column(*c)))
1085                }));
1086            }
1087        }
1088
1089        let order = Vec::with_capacity(inputs);
1090        let placed = vec![false; inputs];
1091        let bound = vec![Vec::new(); inputs];
1092        let equivalences_active = vec![false; equivalences.len()];
1093        let arrangement_active = vec![Vec::new(); inputs];
1094        let priority_queue = std::collections::BinaryHeap::new();
1095        Self {
1096            inputs,
1097            equivalences,
1098            arrangements,
1099            unique_keys,
1100            cardinalities,
1101            filters,
1102            input_mapper,
1103            reverse_equivalences,
1104            unique_arrangement,
1105            order,
1106            placed,
1107            bound,
1108            equivalences_active,
1109            arrangement_active,
1110            priority_queue,
1111            enable_join_prioritize_arranged,
1112        }
1113    }
1114
1115    fn optimize_order_for(
1116        &mut self,
1117        start: usize,
1118    ) -> Result<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>, TransformError> {
1119        self.order.clear();
1120        self.priority_queue.clear();
1121        for input in 0..self.inputs {
1122            self.placed[input] = false;
1123            self.bound[input].clear();
1124            self.arrangement_active[input].clear();
1125        }
1126        for index in 0..self.equivalences.len() {
1127            self.equivalences_active[index] = false;
1128        }
1129
1130        // Introduce cross joins as a possibility.
1131        for input in 0..self.inputs {
1132            let cardinality = self.cardinalities[input];
1133
1134            let is_unique = self.unique_keys[input].iter().any(|cols| cols.is_empty());
1135            if let Some(pos) = self.arrangements[input]
1136                .iter()
1137                .position(|key| key.is_empty())
1138            {
1139                self.arrangement_active[input].push(pos);
1140                self.priority_queue.push((
1141                    JoinInputCharacteristics::new(
1142                        is_unique,
1143                        0,
1144                        true,
1145                        cardinality,
1146                        self.filters[input].clone(),
1147                        input,
1148                        self.enable_join_prioritize_arranged,
1149                    ),
1150                    vec![],
1151                    input,
1152                ));
1153            } else {
1154                self.priority_queue.push((
1155                    JoinInputCharacteristics::new(
1156                        is_unique,
1157                        0,
1158                        false,
1159                        cardinality,
1160                        self.filters[input].clone(),
1161                        input,
1162                        self.enable_join_prioritize_arranged,
1163                    ),
1164                    vec![],
1165                    input,
1166                ));
1167            }
1168        }
1169
1170        // Main loop, ordering all the inputs.
1171        if self.inputs > 1 {
1172            self.order_input(start);
1173            while self.order.len() < self.inputs - 1 {
1174                let (characteristics, key, input) = self.priority_queue.pop().unwrap();
1175                // put the tuple into `self.order` unless the tuple with the same
1176                // input is already in `self.order`. For all inputs other than
1177                // start, `self.placed[input]` is an indication of whether a
1178                // corresponding tuple is already in `self.order`.
1179                if !self.placed[input] {
1180                    // non-starting inputs are ordered in decreasing priority
1181                    self.order.push((characteristics, key, input));
1182                    self.order_input(input);
1183                }
1184            }
1185        }
1186
1187        // `order` now contains all the inputs except the first. Let's create an item for the first
1188        // input. We know which input that is, but we need to compute a key and characteristics.
1189        // We start with some default values:
1190        let mut start_tuple = (
1191            JoinInputCharacteristics::new(
1192                false,
1193                0,
1194                false,
1195                self.cardinalities[start],
1196                self.filters[start].clone(),
1197                start,
1198                self.enable_join_prioritize_arranged,
1199            ),
1200            vec![],
1201            start,
1202        );
1203        // The key should line up with the key of the second input (if there is a second input).
1204        // (At this point, `order[0]` is what will eventually be `order[1]`, i.e., the second input.)
1205        if let Some((_, key, second)) = self.order.get(0) {
1206            // for each component of the key of the second input, try to find the corresponding key
1207            // component in the starting input
1208            let candidate_start_key = key
1209                .iter()
1210                .filter_map(|k| {
1211                    let k = self.input_mapper.map_expr_to_global(k.clone(), *second);
1212                    self.input_mapper
1213                        .find_bound_expr(&k, &[start], self.equivalences)
1214                        .map(|bound_key| self.input_mapper.map_expr_to_local(bound_key))
1215                })
1216                .collect::<Vec<_>>();
1217            if candidate_start_key.len() == key.len() {
1218                let cardinality = self.cardinalities[start];
1219                let is_unique = self.unique_keys[start].iter().any(|cols| {
1220                    cols.iter()
1221                        .all(|c| candidate_start_key.contains(&MirScalarExpr::Column(*c)))
1222                });
1223                let arranged = self.arrangements[start]
1224                    .iter()
1225                    .find(|arrangement_key| arrangement_key == &&candidate_start_key)
1226                    .is_some();
1227                start_tuple = (
1228                    JoinInputCharacteristics::new(
1229                        is_unique,
1230                        candidate_start_key.len(),
1231                        arranged,
1232                        cardinality,
1233                        self.filters[start].clone(),
1234                        start,
1235                        self.enable_join_prioritize_arranged,
1236                    ),
1237                    candidate_start_key,
1238                    start,
1239                );
1240            } else {
1241                // For the second input's key fields, there is nothing else to equate it with but
1242                // the fields of the first input, so we should find a match for each of the fields.
1243                // (For a later input, different fields of a key might be equated with fields coming
1244                // from various inputs.)
1245                // Technically, this happens as follows:
1246                // The second input must have been placed in the `priority_queue` either
1247                // 1) as a cross join possibility, or
1248                // 2) when we called `order_input` on the starting input.
1249                // In the 1) case, `key.len()` is 0. In the 2) case, it was the very first call to
1250                // `order_input`, which means that `placed` was true only for the
1251                // starting input, which means that `fully_supported` was true due to
1252                // one of the expressions referring only to the starting input.
1253                let msg = "Unreachable state in join order optimization".to_string();
1254                return Err(TransformError::Internal(msg));
1255                // (This couldn't be a soft_panic: we would form an arrangement with a wrong key.)
1256            }
1257        }
1258        self.order.insert(0, start_tuple);
1259
1260        Ok(std::mem::replace(&mut self.order, Vec::new()))
1261    }
1262
1263    /// Introduces a specific input and keys to the order, along with its characteristics.
1264    ///
1265    /// This method places a next element in the order, and updates the associated state
1266    /// about other candidates, including which columns are now bound and which potential
1267    /// keys are available to consider (both arranged, and unarranged).
1268    fn order_input(&mut self, input: usize) {
1269        self.placed[input] = true;
1270        for (equivalence, expr_index) in self.reverse_equivalences[input].iter() {
1271            if !self.equivalences_active[*equivalence] {
1272                // Placing `input` *may* activate the equivalence. Each of its columns
1273                // come in to scope, which may result in an expression in `equivalence`
1274                // becoming fully defined (when its support is contained in placed inputs)
1275                let fully_supported = self
1276                    .input_mapper
1277                    .lookup_inputs(&self.equivalences[*equivalence][*expr_index])
1278                    .all(|i| self.placed[i]);
1279                if fully_supported {
1280                    self.equivalences_active[*equivalence] = true;
1281                    for expr in self.equivalences[*equivalence].iter() {
1282                        // find the relations that columns in the expression belong to
1283                        let mut rels = self.input_mapper.lookup_inputs(expr);
1284                        // Skip the expression if
1285                        // * the expression is a literal -> this would translate
1286                        //   to `rels` being empty
1287                        // * the expression has columns belonging to more than
1288                        //   one relation -> TODO: see how we can plan better in
1289                        //   this case. Arguably, if this happens, it would
1290                        //   not be unreasonable to ask the user to write the
1291                        //   query better.
1292                        if let Some(rel) = rels.next() {
1293                            if rels.next().is_none() {
1294                                let expr = self.input_mapper.map_expr_to_local(expr.clone());
1295
1296                                // Update bound columns.
1297                                self.bound[rel].push(expr);
1298                                self.bound[rel].sort();
1299
1300                                // Reconsider all available arrangements.
1301                                for (pos, key) in self.arrangements[rel].iter().enumerate() {
1302                                    if !self.arrangement_active[rel].contains(&pos) {
1303                                        // TODO: support the restoration of the
1304                                        // following original lines, which have been
1305                                        // commented out because Materialize may
1306                                        // panic otherwise. The original line and comments
1307                                        // here are:
1308                                        // Determine if the arrangement is viable, which happens when the
1309                                        // support of its key is all bound.
1310                                        // if key.iter().all(|k| k.support().iter().all(|c| self.bound[*rel].contains(&ScalarExpr::Column(*c))) {
1311
1312                                        // Determine if the arrangement is viable,
1313                                        // which happens when all its key components are bound.
1314                                        if key.iter().all(|k| self.bound[rel].contains(k)) {
1315                                            self.arrangement_active[rel].push(pos);
1316                                            // TODO: This could be pre-computed, as it is independent of the order.
1317                                            let is_unique = self.unique_arrangement[rel][pos];
1318                                            self.priority_queue.push((
1319                                                JoinInputCharacteristics::new(
1320                                                    is_unique,
1321                                                    key.len(),
1322                                                    true,
1323                                                    self.cardinalities[rel],
1324                                                    self.filters[rel].clone(),
1325                                                    rel,
1326                                                    self.enable_join_prioritize_arranged,
1327                                                ),
1328                                                key.clone(),
1329                                                rel,
1330                                            ));
1331                                        }
1332                                    }
1333                                }
1334
1335                                // does the relation we're joining on have a unique key wrt what's already bound?
1336                                let is_unique = self.unique_keys[rel].iter().any(|cols| {
1337                                    cols.iter().all(|c| {
1338                                        self.bound[rel].contains(&MirScalarExpr::Column(*c))
1339                                    })
1340                                });
1341                                self.priority_queue.push((
1342                                    JoinInputCharacteristics::new(
1343                                        is_unique,
1344                                        self.bound[rel].len(),
1345                                        false,
1346                                        self.cardinalities[rel],
1347                                        self.filters[rel].clone(),
1348                                        rel,
1349                                        self.enable_join_prioritize_arranged,
1350                                    ),
1351                                    self.bound[rel].clone(),
1352                                    rel,
1353                                ));
1354                            }
1355                        }
1356                    }
1357                }
1358            }
1359        }
1360    }
1361}