mz_transform/
join_implementation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Clippy's cognitive complexity is easy to reach.
11//#![allow(clippy::cognitive_complexity)]
12
13//! Determines the join implementation for join operators.
14//!
15//! This includes determining the type of join (e.g. differential linear, or delta queries),
16//! determining the orders of collections, lifting predicates if useful arrangements exist,
17//! and identifying opportunities to use indexes to replace filters.
18
19use std::collections::BTreeMap;
20
21use itertools::Itertools;
22use mz_expr::JoinImplementation::{Differential, IndexedFilter, Unimplemented};
23use mz_expr::visit::{Visit, VisitChildren};
24use mz_expr::{
25    FilterCharacteristics, Id, JoinInputCharacteristics, JoinInputMapper, MapFilterProject,
26    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
27};
28use mz_ore::stack::{CheckedRecursion, RecursionGuard};
29use mz_ore::{soft_assert_or_log, soft_panic_or_log};
30use mz_repr::optimize::OptimizerFeatures;
31
32use crate::analysis::{Cardinality, DerivedBuilder};
33use crate::join_implementation::index_map::IndexMap;
34use crate::predicate_pushdown::PredicatePushdown;
35use crate::{StatisticsOracle, TransformCtx, TransformError};
36
37/// Determines the join implementation for join operators.
38#[derive(Debug)]
39pub struct JoinImplementation {
40    recursion_guard: RecursionGuard,
41}
42
43impl Default for JoinImplementation {
44    /// Construct a new [`JoinImplementation`] where `recursion_guard`
45    /// is initialized with [`RECURSION_LIMIT`] as limit.
46    fn default() -> JoinImplementation {
47        JoinImplementation {
48            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
49        }
50    }
51}
52
53impl CheckedRecursion for JoinImplementation {
54    fn recursion_guard(&self) -> &RecursionGuard {
55        &self.recursion_guard
56    }
57}
58
59impl crate::Transform for JoinImplementation {
60    fn name(&self) -> &'static str {
61        "JoinImplementation"
62    }
63
64    #[mz_ore::instrument(
65        target = "optimizer",
66        level = "debug",
67        fields(path.segment = "join_implementation")
68    )]
69    fn actually_perform_transform(
70        &self,
71        relation: &mut MirRelationExpr,
72        ctx: &mut TransformCtx,
73    ) -> Result<(), TransformError> {
74        let result = self.action_recursive(
75            relation,
76            &mut IndexMap::new(ctx.indexes),
77            ctx.stats,
78            ctx.features,
79        );
80        mz_repr::explain::trace_plan(&*relation);
81        result
82    }
83}
84
85impl JoinImplementation {
86    /// Pre-order visitor for each `MirRelationExpr` to find join operators.
87    ///
88    /// This method accumulates state about let-bound arrangements, so that
89    /// join operators can more accurately assess their available arrangements.
90    pub fn action_recursive(
91        &self,
92        relation: &mut MirRelationExpr,
93        indexes: &mut IndexMap,
94        stats: &dyn StatisticsOracle,
95        features: &OptimizerFeatures,
96    ) -> Result<(), TransformError> {
97        self.checked_recur(|_| {
98            if let MirRelationExpr::Let { id, value, body } = relation {
99                self.action_recursive(value, indexes, stats, features)?;
100                match &**value {
101                    MirRelationExpr::ArrangeBy { keys, .. } => {
102                        for key in keys {
103                            indexes.add_local(*id, key.clone());
104                        }
105                    }
106                    MirRelationExpr::Reduce { group_key, .. } => {
107                        indexes.add_local(
108                            *id,
109                            (0..group_key.len()).map(MirScalarExpr::column).collect(),
110                        );
111                    }
112                    _ => {}
113                }
114                self.action_recursive(body, indexes, stats, features)?;
115                indexes.remove_local(*id);
116                Ok(())
117            } else {
118                let (mfp, mfp_input) =
119                    MapFilterProject::extract_non_errors_from_expr_ref_mut(relation);
120                mfp_input.try_visit_mut_children(|e| {
121                    self.action_recursive(e, indexes, stats, features)
122                })?;
123                self.action(mfp_input, mfp, indexes, stats, features)?;
124                Ok(())
125            }
126        })
127    }
128
129    /// Determines the join implementation for join operators.
130    pub fn action(
131        &self,
132        relation: &mut MirRelationExpr,
133        mfp_above: MapFilterProject,
134        indexes: &IndexMap,
135        stats: &dyn StatisticsOracle,
136        features: &OptimizerFeatures,
137    ) -> Result<(), TransformError> {
138        if let MirRelationExpr::Join {
139            inputs,
140            equivalences,
141            // (Note that `JoinImplementation` runs in a fixpoint loop.)
142            // If the current implementation is
143            //  - Unimplemented, then we need to come up with an implementation.
144            //  - Differential, then we consider switching to a Delta join, because we might have
145            //    inserted some ArrangeBys that create new arrangements when we came up with the
146            //    Differential plan, in which case a Delta join might have become viable.
147            //  - Delta, then we are good already.
148            //  - IndexedFilter, then we just leave that alone, because those are out of scope
149            //    for JoinImplementation (they are created by `LiteralConstraints`).
150            // We don't want to change from a Differential plan to an other Differential plan, or
151            // from a Delta plan to an other Delta plan, because the second run cannot distinguish
152            // between an ArrangeBy that marks an already existing arrangement and an ArrangeBy
153            // that was inserted by a previous run of JoinImplementation. (We should eventually
154            // refactor this to make ArrangeBy unambiguous somehow. Maybe move JoinImplementation
155            // to the lowering.)
156            implementation: implementation @ (Unimplemented | Differential(..)),
157        } = relation
158        {
159            // If we eagerly plan delta joins, we don't need the second run to "pick up" delta joins
160            // that could be planned with the arrangements from a differential. If such a delta
161            // join were viable, we'd have already planned it the first time.
162            if features.enable_eager_delta_joins && !matches!(implementation, Unimplemented) {
163                return Ok(());
164            }
165
166            let input_types = inputs.iter().map(|i| i.typ()).collect::<Vec<_>>();
167
168            // Canonicalize the equivalence classes
169            if matches!(implementation, Unimplemented) {
170                // Let's do this only if it's the first run of JoinImplementation, in which case we
171                // are guaranteed to produce a new plan, which will be compatible with the modified
172                // equivalences from the below call. Otherwise, if we already have a Differential or
173                // a Delta join, then we might discard the new plan and go with the old plan, which
174                // was created previously for the old equivalences, and might be invalid for the
175                // modified equivalences from the below call. Note that this issue can arise only if
176                // `canonicalize_equivalences` is not idempotent, which unfortunately seems to be
177                // the case.
178                mz_expr::canonicalize::canonicalize_equivalences(
179                    equivalences,
180                    input_types.iter().map(|t| &t.column_types),
181                );
182            }
183
184            // Common information of broad utility.
185            let input_mapper = JoinInputMapper::new_from_input_types(&input_types);
186
187            // The first fundamental question is whether we should employ a delta query or not.
188            //
189            // Here we conservatively use the rule that if sufficient arrangements exist we will
190            // use a delta query (except for 2-input joins). (With eager delta joins, we will
191            // settle for fewer arrangements in the delta join than in the differential join.)
192            //
193            // An arrangement is considered available for an input
194            // - if it is a `Get` with columns present in `indexes`,
195            //   - or the same wrapped by an IndexedFilter,
196            // - if it is an `ArrangeBy` with the columns present (note that the ArrangeBy might
197            //   have been inserted by a previous run of JoinImplementation),
198            // - if it is a `Reduce` whose output is arranged the right way,
199            // - if it is a filter wrapped around either of these (see the mfp extraction).
200            //
201            // The `IndexedFilter` case above is to avoid losing some Delta joins
202            // due to `IndexedFilter` on a join input. This means that in the absolute worst
203            // case (when the `IndexedFilter` doesn't filter out anything), we will fully
204            // re-create some arrangements that we already have for that input. This worst case
205            // is still better than what can happen if we lose a Delta join: Differential joins
206            // will create several new arrangements that doesn't even have a size bound, i.e.,
207            // they might be larger than any user-created index.
208
209            let unique_keys = input_types
210                .into_iter()
211                .map(|typ| typ.keys)
212                .collect::<Vec<_>>();
213            let mut available_arrangements = vec![Vec::new(); inputs.len()];
214            let mut filters = Vec::with_capacity(inputs.len());
215            let mut cardinalities = Vec::with_capacity(inputs.len());
216
217            // We figure out what predicates from mfp_above could be pushed to which input.
218            // We won't actually push these down now; this just informs FilterCharacteristics.
219            let (map, mut filter, _) = mfp_above.as_map_filter_project();
220            let all_errors = filter.iter().all(|p| p.is_literal_err());
221            let (_, pushed_through_map) = PredicatePushdown::push_filters_through_map(
222                &map,
223                &mut filter,
224                mfp_above.input_arity,
225                all_errors,
226            )?;
227            let (_, push_downs) = PredicatePushdown::push_filters_through_join(
228                &input_mapper,
229                equivalences,
230                pushed_through_map,
231            );
232
233            for index in 0..inputs.len() {
234                // We can work around mfps, as we can lift the mfps into the join execution.
235                let (mfp, input) = MapFilterProject::extract_non_errors_from_expr(&inputs[index]);
236                let (_, filter, project) = mfp.as_map_filter_project();
237
238                // We gather filter characteristics:
239                // - From the filter that is directly at the top mfp of the input.
240                // - IndexedFilter joins are constructed from literal equality filters.
241                // - If the input is an ArrangeBy, then we gather filter characteristics from
242                //   the mfp below the ArrangeBy. (JoinImplementation often inserts ArrangeBys.)
243                // - From filters that could be pushed down from above the join to this input.
244                //   (In LIR, these will be executed right after the join path executes the join
245                //   for this input.)
246                // - (No need to look behind Gets, see the inline_mfp argument of RelationCSE.)
247                let mut characteristics = FilterCharacteristics::filter_characteristics(&filter)?;
248                if matches!(
249                    input,
250                    MirRelationExpr::Join {
251                        implementation: IndexedFilter(..),
252                        ..
253                    }
254                ) {
255                    characteristics.add_literal_equality();
256                }
257                if let MirRelationExpr::ArrangeBy {
258                    input: arrange_by_input,
259                    ..
260                } = input
261                {
262                    let (mfp, input) =
263                        MapFilterProject::extract_non_errors_from_expr(arrange_by_input);
264                    let (_, filter, _) = mfp.as_map_filter_project();
265                    characteristics |= FilterCharacteristics::filter_characteristics(&filter)?;
266                    if matches!(
267                        input,
268                        MirRelationExpr::Join {
269                            implementation: IndexedFilter(..),
270                            ..
271                        }
272                    ) {
273                        characteristics.add_literal_equality();
274                    }
275                }
276                let push_down_characteristics =
277                    FilterCharacteristics::filter_characteristics(&push_downs[index])?;
278                let push_down_factor = push_down_characteristics.worst_case_scaling_factor();
279                characteristics |= push_down_characteristics;
280
281                // Estimate cardinality
282                if features.enable_cardinality_estimates {
283                    let mut builder = DerivedBuilder::new(features);
284                    // TODO(mgree): it would be good to not have to copy the statistics here
285                    builder.require(Cardinality::with_stats(stats.as_map()));
286                    let derived = builder.visit(input);
287
288                    let estimate = *derived.as_view().value::<Cardinality>().unwrap();
289                    // we've already accounted for the filters _in_ the term; these capture the ones above
290                    let scaled = estimate * push_down_factor;
291                    cardinalities.push(scaled.rounded());
292                } else {
293                    cardinalities.push(None);
294                }
295
296                filters.push(characteristics);
297
298                // Collect available arrangements on this input.
299                match input {
300                    MirRelationExpr::Get { id, typ: _, .. } => {
301                        available_arrangements[index]
302                            .extend(indexes.get(*id).map(|key| key.to_vec()));
303                    }
304                    MirRelationExpr::ArrangeBy { input, keys } => {
305                        // We may use any presented arrangement keys.
306                        available_arrangements[index].extend(keys.clone());
307                        if let MirRelationExpr::Get { id, typ: _, .. } = &**input {
308                            available_arrangements[index]
309                                .extend(indexes.get(*id).map(|key| key.to_vec()));
310                        }
311                    }
312                    MirRelationExpr::Reduce { group_key, .. } => {
313                        // The first `group_key.len()` columns form an arrangement key.
314                        available_arrangements[index]
315                            .push((0..group_key.len()).map(MirScalarExpr::column).collect());
316                    }
317                    MirRelationExpr::Join {
318                        implementation: IndexedFilter(id, ..),
319                        ..
320                    } => {
321                        available_arrangements[index]
322                            .extend(indexes.get(Id::Global(id.clone())).map(|key| key.to_vec()));
323                    }
324                    _ => {}
325                }
326                available_arrangements[index].sort();
327                available_arrangements[index].dedup();
328                let reverse_project = project
329                    .into_iter()
330                    .enumerate()
331                    .map(|(i, c)| (c, i))
332                    .collect::<BTreeMap<_, _>>();
333                // Eliminate arrangements referring to columns that have been
334                // projected away by surrounding MFPs.
335                available_arrangements[index].retain(|key| {
336                    key.iter()
337                        .all(|k| k.support().iter().all(|c| reverse_project.contains_key(c)))
338                });
339                // Permute arrangements so columns reference what is after the MFP.
340                for key in available_arrangements[index].iter_mut() {
341                    for k in key.iter_mut() {
342                        k.permute_map(&reverse_project);
343                    }
344                }
345                // Currently we only support using arrangements all of whose
346                // key components can be found in some equivalence.
347                // Note: because `order_input` currently only finds arrangements
348                // with exact key matches, the code below can be removed with no
349                // change in behavior, but this is being kept for a future
350                // TODO: expand `order_input`
351                available_arrangements[index].retain(|key| {
352                    key.iter().all(|k| {
353                        let k = input_mapper.map_expr_to_global(k.clone(), index);
354                        equivalences
355                            .iter()
356                            .any(|equivalence| equivalence.contains(&k))
357                    })
358                });
359            }
360
361            let old_implementation = implementation.clone();
362            let num_inputs = inputs.len();
363            // We've already planned a differential join... should we replace it with a delta join?
364            //
365            // This code path is only active when `eager_delta_joins` is false.
366            if matches!(old_implementation, Differential(..)) {
367                soft_assert_or_log!(
368                    !features.enable_eager_delta_joins,
369                    "eager delta joins run join implementation just once"
370                );
371
372                // Binary joins can't be delta joins---give up.
373                if inputs.len() <= 2 {
374                    return Ok(());
375                }
376
377                // Only plan a delta join if it's no new arrangements (beyond what differential planned).
378                if let Ok((delta_query_plan, 0)) = delta_queries::plan(
379                    relation,
380                    &input_mapper,
381                    &available_arrangements,
382                    &unique_keys,
383                    &cardinalities,
384                    &filters,
385                    features,
386                ) {
387                    tracing::debug!(plan = ?delta_query_plan, "replacing differential join with delta join");
388                    *relation = delta_query_plan;
389                }
390
391                return Ok(());
392            }
393
394            // To have reached here, we must be in our first run of join planning.
395            //
396            // We plan a differential join first.
397            let (differential_query_plan, differential_new_arrangements) = differential::plan(
398                relation,
399                &input_mapper,
400                &available_arrangements,
401                &unique_keys,
402                &cardinalities,
403                &filters,
404                features,
405            )
406            .expect("Failed to produce a differential join plan");
407
408            // Binary joins _must_ be differential. We won't plan a delta join.
409            if num_inputs <= 2 {
410                // if inputs.len() == 0 then something is very wrong.
411                soft_assert_or_log!(num_inputs != 0, "join with no inputs");
412                // if inputs.len() == 1:
413                // Single input joins are filters and should be planned as
414                // differential plans instead of delta queries. Because a
415                // a filter gets converted into a single input join only when
416                // there are existing arrangements, without this early return,
417                // filters will always be planned as delta queries.
418                // Note: This can actually occur, see github-24511.slt.
419                //
420                // if inputs.len() == 2:
421                // We decided to always plan this as a differential join for now, because the usual
422                // advantage of a Delta join avoiding intermediate arrangements doesn't apply.
423                // See more details here:
424                // https://github.com/MaterializeInc/materialize/pull/16099#issuecomment-1316857374
425                // https://github.com/MaterializeInc/materialize/pull/17708#discussion_r1112848747
426                *relation = differential_query_plan;
427
428                return Ok(());
429            }
430
431            // We are planning a multiway join for the first time.
432            //
433            // We compare the delta and differential join plans.
434            //
435            // A delta query requires that, for every path, there is an arrangement for every
436            // input except for the starting one. Such queries are viable when:
437            //
438            //   (a) all the arrangements already exist, or
439            //   (b) both:
440            //       (i) we wouldn't create more arrangements than a differential join would
441            //       (ii) `enable_eager_delta_joins` is on
442            //
443            // A differential join of k relations requires k-2 arrangements of intermediate
444            // results (plus k arrangements of the inputs).
445            //
446            // Consider A ⨝ B ⨝ C ⨝ D. If planned as a differential join, we might have:
447            //          A » B » C » D
448            // This corresponds to the tree:
449            //
450            // A   B
451            //  \ /
452            //   ⨝   C
453            //    \ /
454            //     ⨝   D
455            //      \ /
456            //       ⨝
457            //
458            // At the two internal joins, the differential join will need two new arrangements.
459            //
460            // TODO(mgree): with this refactoring, we should compute `orders` once---both joins
461            //              call `optimize_orders` and we can save some work.
462            match delta_queries::plan(
463                relation,
464                &input_mapper,
465                &available_arrangements,
466                &unique_keys,
467                &cardinalities,
468                &filters,
469                features,
470            ) {
471                // If delta plan's inputs need no new arrangements, pick the delta plan.
472                Ok((delta_query_plan, 0)) => {
473                    soft_assert_or_log!(
474                        matches!(old_implementation, Unimplemented | Differential(..)),
475                        "delta query plans should not be planned twice"
476                    );
477                    tracing::debug!(
478                        plan = ?delta_query_plan,
479                        differential_new_arrangements = differential_new_arrangements,
480                        "picking delta query plan (no new arrangements)");
481                    *relation = delta_query_plan;
482                }
483                // If the delta plan needs new arrangements, compare with the differential plan.
484                Ok((delta_query_plan, delta_new_arrangements)) => {
485                    tracing::debug!(
486                        delta_new_arrangements = delta_new_arrangements,
487                        differential_new_arrangements = differential_new_arrangements,
488                        "comparing delta and differential joins",
489                    );
490
491                    if features.enable_eager_delta_joins
492                        && delta_new_arrangements <= differential_new_arrangements
493                    {
494                        // If we're eagerly planning delta joins, pick the delta plan if it's more economical.
495                        tracing::debug!(
496                            plan = ?delta_query_plan,
497                            "picking delta query plan");
498                        *relation = delta_query_plan;
499                    } else if let Unimplemented = old_implementation {
500                        // If we haven't planned the join yet, use the differential plan.
501                        tracing::debug!(
502                            plan = ?differential_query_plan,
503                            "picking differential query plan");
504                        *relation = differential_query_plan;
505                    } else {
506                        // But don't replace an existing differential plan.
507                        tracing::debug!(plan = ?old_implementation, "keeping old plan");
508                        soft_assert_or_log!(
509                            matches!(old_implementation, Differential(..)),
510                            "implemented plan in second run of join implementation should be differential \
511                             if the delta plan is not viable"
512                        )
513                    }
514                }
515                // If we can't plan a delta join, plan a differential join.
516                Err(err) => {
517                    soft_panic_or_log!("delta planning failed: {err}");
518                    tracing::debug!(
519                        plan = ?differential_query_plan,
520                        "picking differential query plan (delta planning failed)");
521                    *relation = differential_query_plan;
522                }
523            }
524        }
525        Ok(())
526    }
527}
528
529mod index_map {
530    use std::collections::BTreeMap;
531
532    use mz_expr::{Id, LocalId, MirScalarExpr};
533
534    use crate::IndexOracle;
535
536    /// Keeps track of local and global indexes available while descending
537    /// a `MirRelationExpr`.
538    #[derive(Debug)]
539    pub struct IndexMap<'a> {
540        local: BTreeMap<LocalId, Vec<Vec<MirScalarExpr>>>,
541        global: &'a dyn IndexOracle,
542    }
543
544    impl IndexMap<'_> {
545        /// Creates a new index map with knowledge of the provided global indexes.
546        pub fn new(global: &dyn IndexOracle) -> IndexMap<'_> {
547            IndexMap {
548                local: BTreeMap::new(),
549                global,
550            }
551        }
552
553        /// Adds a local index on the specified collection with the specified key.
554        pub fn add_local(&mut self, id: LocalId, key: Vec<MirScalarExpr>) {
555            self.local.entry(id).or_default().push(key)
556        }
557
558        /// Removes all local indexes on the specified collection.
559        pub fn remove_local(&mut self, id: LocalId) {
560            self.local.remove(&id);
561        }
562
563        pub fn get(&self, id: Id) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
564            match id {
565                Id::Global(id) => Box::new(self.global.indexes_on(id).map(|(_idx_id, key)| key)),
566                Id::Local(id) => Box::new(
567                    self.local
568                        .get(&id)
569                        .into_iter()
570                        .flatten()
571                        .map(|x| x.as_slice()),
572                ),
573            }
574        }
575    }
576}
577
578mod delta_queries {
579
580    use std::collections::BTreeSet;
581
582    use mz_expr::{
583        FilterCharacteristics, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
584    };
585    use mz_repr::optimize::OptimizerFeatures;
586
587    use crate::TransformError;
588
589    /// Creates a delta query plan, and any predicates that need to be lifted.
590    /// It also returns the number of new arrangements necessary for this plan.
591    ///
592    /// The method returns `Err` if any errors occur during planning.
593    pub fn plan(
594        join: &MirRelationExpr,
595        input_mapper: &JoinInputMapper,
596        available: &[Vec<Vec<MirScalarExpr>>],
597        unique_keys: &[Vec<Vec<usize>>],
598        cardinalities: &[Option<usize>],
599        filters: &[FilterCharacteristics],
600        optimizer_features: &OptimizerFeatures,
601    ) -> Result<(MirRelationExpr, usize), TransformError> {
602        let mut new_join = join.clone();
603
604        if let MirRelationExpr::Join {
605            inputs,
606            equivalences,
607            implementation,
608        } = &mut new_join
609        {
610            // Determine a viable order for each relation, or return `Err` if none found.
611            let orders = super::optimize_orders(
612                equivalences,
613                available,
614                unique_keys,
615                cardinalities,
616                filters,
617                input_mapper,
618                optimizer_features.enable_join_prioritize_arranged,
619            )?;
620
621            // Count new arrangements.
622            let new_arrangements: usize = orders
623                .iter()
624                .flat_map(|o| {
625                    o.iter().skip(1).filter_map(|(c, key, input)| {
626                        if c.arranged() {
627                            None
628                        } else {
629                            Some((input, key))
630                        }
631                    })
632                })
633                .collect::<BTreeSet<_>>()
634                .len();
635
636            // Convert the order information into specific (input, key, characteristics) information.
637            let mut orders = orders
638                .into_iter()
639                .map(|o| {
640                    o.into_iter()
641                        .skip(1)
642                        .map(|(c, key, r)| (r, key, Some(c)))
643                        .collect::<Vec<_>>()
644                })
645                .collect::<Vec<_>>();
646
647            // Implement arrangements in each of the inputs.
648            let (lifted_mfp, lifted_projections) =
649                super::implement_arrangements(inputs, available, orders.iter().flatten());
650
651            // Permute `order` to compensate for projections being lifted as part of
652            // the mfp lifting in `implement_arrangements`.
653            orders
654                .iter_mut()
655                .for_each(|order| super::permute_order(order, &lifted_projections));
656
657            *implementation = JoinImplementation::DeltaQuery(orders);
658
659            super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
660
661            // Hooray done!
662            Ok((new_join, new_arrangements))
663        } else {
664            Err(TransformError::Internal(String::from(
665                "delta_queries::plan call on non-join expression",
666            )))
667        }
668    }
669}
670
671mod differential {
672    use std::collections::BTreeSet;
673
674    use mz_expr::{JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr};
675    use mz_ore::soft_assert_eq_or_log;
676    use mz_repr::optimize::OptimizerFeatures;
677
678    use crate::TransformError;
679    use crate::join_implementation::FilterCharacteristics;
680
681    /// Creates a linear differential plan, and any predicates that need to be lifted.
682    /// It also returns the number of new arrangements necessary for this plan.
683    pub fn plan(
684        join: &MirRelationExpr,
685        input_mapper: &JoinInputMapper,
686        available: &[Vec<Vec<MirScalarExpr>>],
687        unique_keys: &[Vec<Vec<usize>>],
688        cardinalities: &[Option<usize>],
689        filters: &[FilterCharacteristics],
690        optimizer_features: &OptimizerFeatures,
691    ) -> Result<(MirRelationExpr, usize), TransformError> {
692        let mut new_join = join.clone();
693
694        if let MirRelationExpr::Join {
695            inputs,
696            equivalences,
697            implementation,
698        } = &mut new_join
699        {
700            // We compute one order for each possible starting point, and we will choose one from
701            // these.
702            //
703            // It is an invariant that the orders are in input order: the ith order begins with the ith input.
704            //
705            // We could change this preference at any point, but the list of orders should still inform.
706            // Important, we should choose something stable under re-ordering, to converge under fixed
707            // point iteration; we choose to start with the first input optimizing our criteria, which
708            // should remain stable even when promoted to the first position.
709            let mut orders = super::optimize_orders(
710                equivalences,
711                available,
712                unique_keys,
713                cardinalities,
714                filters,
715                input_mapper,
716                optimizer_features.enable_join_prioritize_arranged,
717            )?;
718
719            // Count new arrangements.
720            //
721            // We collect the count for each input, to be used to calculate `new_arrangements` below.
722            let new_input_arrangements: Vec<usize> = orders
723                .iter()
724                .map(|o| {
725                    o.iter()
726                        .filter_map(|(c, key, input)| {
727                            if c.arranged() {
728                                None
729                            } else {
730                                Some((*input, key.clone()))
731                            }
732                        })
733                        .collect::<BTreeSet<_>>()
734                        .len()
735                })
736                .collect();
737
738            // Inside each order, we take the `FilterCharacteristics` from each element, and OR it
739            // to every other element to the right. This is because we are gonna be looking for the
740            // worst `Characteristic` in every order, and for this it makes sense to include a
741            // filter in a `Characteristic` if the filter was applied not just at that input but
742            // any input before. For examples, see chbench.slt Query 02 and 11.
743            orders.iter_mut().for_each(|order| {
744                let mut sum = FilterCharacteristics::none();
745                for (jic, _, _) in order {
746                    *jic.filters() |= sum;
747                    sum = jic.filters().clone();
748                }
749            });
750
751            // `orders` has one order for each starting collection, and now we have to choose one
752            // from these. First, we find the worst `Characteristics` inside each order, and then we
753            // find the best one among these across all orders, which goes into
754            // `max_min_characteristics`.
755            let max_min_characteristics = orders
756                .iter()
757                .flat_map(|order| order.iter().map(|(c, _, _)| c.clone()).min())
758                .max();
759            let mut order = if let Some(max_min_characteristics) = max_min_characteristics {
760                orders
761                    .into_iter()
762                    .filter(|o| {
763                        o.iter().map(|(c, _, _)| c).min().unwrap() == &max_min_characteristics
764                    })
765                    // It can happen that `orders` has multiple such orders that have the same worst
766                    // `Characteristic` as `max_min_characteristics`. In this case, we go beyond the
767                    // worst `Characteristic`: we inspect the entire `Characteristic` vector of each
768                    // of these orders, and choose the best among these. This pushes bad stuff to
769                    // happen later, by which time we might have applied some filters.
770                    .max_by_key(|o| o.clone())
771                    .ok_or_else(|| {
772                        TransformError::Internal(String::from(
773                            "could not find max-min characteristics",
774                        ))
775                    })?
776                    .into_iter()
777                    .map(|(c, key, r)| (r, key, Some(c)))
778                    .collect::<Vec<_>>()
779            } else {
780                // if max_min_characteristics is None, then there must only be
781                // one input and thus only one order in orders
782                soft_assert_eq_or_log!(orders.len(), 1);
783                orders
784                    .remove(0)
785                    .into_iter()
786                    .map(|(c, key, r)| (r, key, Some(c)))
787                    .collect::<Vec<_>>()
788            };
789
790            let (start, mut start_key, start_characteristics) = order[0].clone();
791
792            // Count new arrangements for this choice of ordering.
793            let new_arrangements = inputs.len().saturating_sub(2) + new_input_arrangements[start];
794
795            // Implement arrangements in each of the inputs.
796            let (lifted_mfp, lifted_projections) =
797                super::implement_arrangements(inputs, available, order.iter());
798
799            // Permute `start_key` and `order` to compensate for projections being lifted as part of
800            // the mfp lifting in `implement_arrangements`.
801            if let Some(proj) = &lifted_projections[start] {
802                start_key.iter_mut().for_each(|k| {
803                    k.permute(proj);
804                });
805            }
806            super::permute_order(&mut order, &lifted_projections);
807
808            // now that the starting arrangement has been implemented,
809            // remove it from `order` so `order` only contains information
810            // about the other inputs
811            order.remove(0);
812
813            // Install the implementation.
814            *implementation = JoinImplementation::Differential(
815                (start, Some(start_key), start_characteristics),
816                order,
817            );
818
819            super::install_lifted_mfp(&mut new_join, lifted_mfp)?;
820
821            // Hooray done!
822            Ok((new_join, new_arrangements))
823        } else {
824            Err(TransformError::Internal(String::from(
825                "differential::plan call on non-join expression.",
826            )))
827        }
828    }
829}
830
831/// Modify `inputs` to ensure specified arrangements are available.
832///
833/// Lift filter predicates when all needed arrangements are otherwise available.
834///
835/// Returns
836///  - The lifted mfps combined into one mfp.
837///  - Permutations for each input, which were lifted as part of the mfp lifting. These should be
838///    applied to the join order.
839fn implement_arrangements<'a>(
840    inputs: &mut [MirRelationExpr],
841    available_arrangements: &[Vec<Vec<MirScalarExpr>>],
842    needed_arrangements: impl Iterator<
843        Item = &'a (usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>),
844    >,
845) -> (MapFilterProject, Vec<Option<Vec<usize>>>) {
846    // Collect needed arrangements by source index.
847    let mut needed = vec![Vec::new(); inputs.len()];
848    for (index, key, _characteristics) in needed_arrangements {
849        needed[*index].push(key.clone());
850    }
851
852    let mut lifted_mfps = vec![None; inputs.len()];
853    let mut lifted_projections = vec![None; inputs.len()];
854
855    // Transform inputs[index] based on needed and available arrangements.
856    // Specifically, lift intervening mfps if all arrangements exist.
857    for (index, needed) in needed.iter_mut().enumerate() {
858        needed.sort();
859        needed.dedup();
860        // We should lift any mfps, iff all arrangements are otherwise available.
861        if !needed.is_empty()
862            && needed
863                .iter()
864                .all(|key| available_arrangements[index].contains(key))
865        {
866            lifted_mfps[index] = Some(MapFilterProject::extract_non_errors_from_expr_mut(
867                &mut inputs[index],
868            ));
869        }
870        // Clean up existing arrangements, and install one with the needed keys.
871        while let MirRelationExpr::ArrangeBy { input: inner, .. } = &mut inputs[index] {
872            inputs[index] = inner.take_dangerous();
873        }
874        // If a mfp was lifted in order to install the arrangement, permute the arrangement and
875        // save the lifted projection.
876        if let Some(lifted_mfp) = &lifted_mfps[index] {
877            let (_, _, project) = lifted_mfp.as_map_filter_project();
878            for arrangement_key in needed.iter_mut() {
879                for k in arrangement_key.iter_mut() {
880                    k.permute(&project);
881                }
882            }
883            lifted_projections[index] = Some(project);
884        }
885        if !needed.is_empty() {
886            inputs[index] = MirRelationExpr::arrange_by(inputs[index].take_dangerous(), needed);
887        }
888    }
889
890    // Combine lifted mfps into one.
891    let new_join_mapper = JoinInputMapper::new(inputs);
892    let mut arity = new_join_mapper.total_columns();
893    let combined_mfp = MapFilterProject::new(arity);
894    let mut combined_filter = Vec::new();
895    let mut combined_map = Vec::new();
896    let mut combined_project = Vec::new();
897    for (index, lifted_mfp) in lifted_mfps.into_iter().enumerate() {
898        if let Some(mut lifted_mfp) = lifted_mfp {
899            let column_map = new_join_mapper
900                .local_columns(index)
901                .zip_eq(new_join_mapper.global_columns(index))
902                .collect::<BTreeMap<_, _>>();
903            lifted_mfp.permute_fn(
904                // globalize all input column references
905                |c| column_map[&c],
906                // shift the position of scalars to be after the last input
907                // column
908                arity,
909            );
910            let (mut map, mut filter, mut project) = lifted_mfp.as_map_filter_project();
911            arity += map.len();
912            combined_map.append(&mut map);
913            combined_filter.append(&mut filter);
914            combined_project.append(&mut project);
915        } else {
916            combined_project.extend(new_join_mapper.global_columns(index));
917        }
918    }
919
920    (
921        combined_mfp
922            .map(combined_map)
923            .filter(combined_filter)
924            .project(combined_project),
925        lifted_projections,
926    )
927}
928
929/// This function continues the surgery that `implement_arrangements` started.
930///
931/// (In theory, this function could be merged into `implement_arrangements`, but it would be a bit
932/// painful, because we need to access the join's `implementation` after `implement_arrangements`,
933/// which would be somewhat convoluted after what `install_lifted_mfp` does.)
934///
935/// The given MFP should be the merged MFP from all the lifted inputs MFPs.
936///
937/// `install_lifted_mfp` mutates the given join expression as follows:
938/// - Puts the given MFP on top of the Join.
939/// - Attends to `equivalences`, which was invalidated by `implement_arrangements` if it refers to a
940///   column that was permuted or created by the given MFP.
941/// - Canonicalizes scalar expressions in maps and filters with respect to the join equivalences.
942///   See inline comment for more details.
943fn install_lifted_mfp(
944    new_join: &mut MirRelationExpr,
945    mfp: MapFilterProject,
946) -> Result<(), TransformError> {
947    if !mfp.is_identity() {
948        let (mut map, mut filter, project) = mfp.as_map_filter_project();
949        if let MirRelationExpr::Join { equivalences, .. } = new_join {
950            for equivalence in equivalences.iter_mut() {
951                for expr in equivalence.iter_mut() {
952                    // permute `equivalences` in light of the project being lifted
953                    expr.permute(&project);
954                    // if column references refer to mapped expressions that have been
955                    // lifted, replace the column reference with the mapped expression.
956                    expr.visit_mut_pre(&mut |e| {
957                        // This has to be a loop! This is because it can happen that the new
958                        // expression is again a column reference, in which case the visitor
959                        // wouldn't be called on it again. (The visitation continues with the
960                        // children of the new expression, but won't visit the new expression
961                        // itself again.)
962                        while let MirScalarExpr::Column(c, _) = e {
963                            if *c >= mfp.input_arity {
964                                *e = map[*c - mfp.input_arity].clone();
965                            } else {
966                                break;
967                            }
968                        }
969                    })?;
970                }
971            }
972            // Canonicalize scalar expressions in maps and filters with respect to the join
973            // equivalences. This often makes some filters identical, which are then removed.
974            // The identical filters come from either
975            //  - lifting several predicates that originally were pushed down by localizing to more
976            //    than one inputs;
977            //  - individual IS NOT NULL filters on each of the inputs, which become identical
978            //    when rewritten using the join equivalences.
979            //  (This allows for almost the same optimizations as when `Demand`
980            //  used to insert Projections that were marking some columns to be
981            //  identical, when Demand used to run after `JoinImplementation`.)
982            let canonicalizer_map = mz_expr::canonicalize::get_canonicalizer_map(equivalences);
983            for expr in map.iter_mut().chain(filter.iter_mut()) {
984                expr.visit_mut_post(&mut |e| {
985                    if let Some(canonical_expr) = canonicalizer_map.get(e) {
986                        *e = canonical_expr.clone();
987                    }
988                })?
989            }
990        }
991        *new_join = new_join.clone().map(map).filter(filter).project(project);
992    }
993    Ok(())
994}
995
996/// Permute the keys in `order` to compensate for projections being lifted from inputs.
997/// `lifted_projections` has an optional projection for each input.
998fn permute_order(
999    order: &mut Vec<(usize, Vec<MirScalarExpr>, Option<JoinInputCharacteristics>)>,
1000    lifted_projections: &Vec<Option<Vec<usize>>>,
1001) {
1002    order.iter_mut().for_each(|(index, key, _)| {
1003        key.iter_mut().for_each(|kc| {
1004            if let Some(proj) = &lifted_projections[*index] {
1005                kc.permute(proj);
1006            }
1007        })
1008    })
1009}
1010
1011// Computes the best join orders for each input.
1012//
1013// If there are N inputs, returns N orders, with the ith input starting the ith order.
1014fn optimize_orders(
1015    equivalences: &[Vec<MirScalarExpr>], // join equivalences: inside a Vec, the exprs are equivalent
1016    available: &[Vec<Vec<MirScalarExpr>>], // available arrangements per input
1017    unique_keys: &[Vec<Vec<usize>>],     // unique keys per input
1018    cardinalities: &[Option<usize>],     // cardinalities of input relations
1019    filters: &[FilterCharacteristics],   // filter characteristics per input
1020    input_mapper: &JoinInputMapper,      // join helper
1021    enable_join_prioritize_arranged: bool,
1022) -> Result<Vec<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>>, TransformError> {
1023    let mut orderer = Orderer::new(
1024        equivalences,
1025        available,
1026        unique_keys,
1027        cardinalities,
1028        filters,
1029        input_mapper,
1030        enable_join_prioritize_arranged,
1031    );
1032    (0..available.len())
1033        .map(move |i| orderer.optimize_order_for(i))
1034        .collect::<Result<Vec<_>, _>>()
1035}
1036
1037struct Orderer<'a> {
1038    inputs: usize,
1039    equivalences: &'a [Vec<MirScalarExpr>],
1040    arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1041    unique_keys: &'a [Vec<Vec<usize>>],
1042    cardinalities: &'a [Option<usize>],
1043    filters: &'a [FilterCharacteristics],
1044    input_mapper: &'a JoinInputMapper,
1045    reverse_equivalences: Vec<Vec<(usize, usize)>>,
1046    unique_arrangement: Vec<Vec<bool>>,
1047
1048    order: Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1049    placed: Vec<bool>,
1050    bound: Vec<Vec<MirScalarExpr>>,
1051    equivalences_active: Vec<bool>,
1052    arrangement_active: Vec<Vec<usize>>,
1053    priority_queue:
1054        std::collections::BinaryHeap<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>,
1055
1056    enable_join_prioritize_arranged: bool,
1057}
1058
1059impl<'a> Orderer<'a> {
1060    fn new(
1061        equivalences: &'a [Vec<MirScalarExpr>],
1062        arrangements: &'a [Vec<Vec<MirScalarExpr>>],
1063        unique_keys: &'a [Vec<Vec<usize>>],
1064        cardinalities: &'a [Option<usize>],
1065        filters: &'a [FilterCharacteristics],
1066        input_mapper: &'a JoinInputMapper,
1067        enable_join_prioritize_arranged: bool,
1068    ) -> Self {
1069        let inputs = arrangements.len();
1070        // A map from inputs to the equivalence classes in which they are referenced.
1071        let mut reverse_equivalences = vec![Vec::new(); inputs];
1072        for (index, equivalence) in equivalences.iter().enumerate() {
1073            for (index2, expr) in equivalence.iter().enumerate() {
1074                for input in input_mapper.lookup_inputs(expr) {
1075                    reverse_equivalences[input].push((index, index2));
1076                }
1077            }
1078        }
1079        // Per-arrangement information about uniqueness of the arrangement key.
1080        let mut unique_arrangement = vec![Vec::new(); inputs];
1081        for (input, keys) in arrangements.iter().enumerate() {
1082            for key in keys.iter() {
1083                unique_arrangement[input].push(unique_keys[input].iter().any(|cols| {
1084                    cols.iter()
1085                        .all(|c| key.contains(&MirScalarExpr::column(*c)))
1086                }));
1087            }
1088        }
1089
1090        let order = Vec::with_capacity(inputs);
1091        let placed = vec![false; inputs];
1092        let bound = vec![Vec::new(); inputs];
1093        let equivalences_active = vec![false; equivalences.len()];
1094        let arrangement_active = vec![Vec::new(); inputs];
1095        let priority_queue = std::collections::BinaryHeap::new();
1096        Self {
1097            inputs,
1098            equivalences,
1099            arrangements,
1100            unique_keys,
1101            cardinalities,
1102            filters,
1103            input_mapper,
1104            reverse_equivalences,
1105            unique_arrangement,
1106            order,
1107            placed,
1108            bound,
1109            equivalences_active,
1110            arrangement_active,
1111            priority_queue,
1112            enable_join_prioritize_arranged,
1113        }
1114    }
1115
1116    fn optimize_order_for(
1117        &mut self,
1118        start: usize,
1119    ) -> Result<Vec<(JoinInputCharacteristics, Vec<MirScalarExpr>, usize)>, TransformError> {
1120        self.order.clear();
1121        self.priority_queue.clear();
1122        for input in 0..self.inputs {
1123            self.placed[input] = false;
1124            self.bound[input].clear();
1125            self.arrangement_active[input].clear();
1126        }
1127        for index in 0..self.equivalences.len() {
1128            self.equivalences_active[index] = false;
1129        }
1130
1131        // Introduce cross joins as a possibility.
1132        for input in 0..self.inputs {
1133            let cardinality = self.cardinalities[input];
1134
1135            let is_unique = self.unique_keys[input].iter().any(|cols| cols.is_empty());
1136            if let Some(pos) = self.arrangements[input]
1137                .iter()
1138                .position(|key| key.is_empty())
1139            {
1140                self.arrangement_active[input].push(pos);
1141                self.priority_queue.push((
1142                    JoinInputCharacteristics::new(
1143                        is_unique,
1144                        0,
1145                        true,
1146                        cardinality,
1147                        self.filters[input].clone(),
1148                        input,
1149                        self.enable_join_prioritize_arranged,
1150                    ),
1151                    vec![],
1152                    input,
1153                ));
1154            } else {
1155                self.priority_queue.push((
1156                    JoinInputCharacteristics::new(
1157                        is_unique,
1158                        0,
1159                        false,
1160                        cardinality,
1161                        self.filters[input].clone(),
1162                        input,
1163                        self.enable_join_prioritize_arranged,
1164                    ),
1165                    vec![],
1166                    input,
1167                ));
1168            }
1169        }
1170
1171        // Main loop, ordering all the inputs.
1172        if self.inputs > 1 {
1173            self.order_input(start);
1174            while self.order.len() < self.inputs - 1 {
1175                let (characteristics, key, input) = self.priority_queue.pop().unwrap();
1176                // put the tuple into `self.order` unless the tuple with the same
1177                // input is already in `self.order`. For all inputs other than
1178                // start, `self.placed[input]` is an indication of whether a
1179                // corresponding tuple is already in `self.order`.
1180                if !self.placed[input] {
1181                    // non-starting inputs are ordered in decreasing priority
1182                    self.order.push((characteristics, key, input));
1183                    self.order_input(input);
1184                }
1185            }
1186        }
1187
1188        // `order` now contains all the inputs except the first. Let's create an item for the first
1189        // input. We know which input that is, but we need to compute a key and characteristics.
1190        // We start with some default values:
1191        let mut start_tuple = (
1192            JoinInputCharacteristics::new(
1193                false,
1194                0,
1195                false,
1196                self.cardinalities[start],
1197                self.filters[start].clone(),
1198                start,
1199                self.enable_join_prioritize_arranged,
1200            ),
1201            vec![],
1202            start,
1203        );
1204        // The key should line up with the key of the second input (if there is a second input).
1205        // (At this point, `order[0]` is what will eventually be `order[1]`, i.e., the second input.)
1206        if let Some((_, key, second)) = self.order.get(0) {
1207            // for each component of the key of the second input, try to find the corresponding key
1208            // component in the starting input
1209            let candidate_start_key = key
1210                .iter()
1211                .filter_map(|k| {
1212                    let k = self.input_mapper.map_expr_to_global(k.clone(), *second);
1213                    self.input_mapper
1214                        .find_bound_expr(&k, &[start], self.equivalences)
1215                        .map(|bound_key| self.input_mapper.map_expr_to_local(bound_key))
1216                })
1217                .collect::<Vec<_>>();
1218            if candidate_start_key.len() == key.len() {
1219                let cardinality = self.cardinalities[start];
1220                let is_unique = self.unique_keys[start].iter().any(|cols| {
1221                    cols.iter()
1222                        .all(|c| candidate_start_key.contains(&MirScalarExpr::column(*c)))
1223                });
1224                let arranged = self.arrangements[start]
1225                    .iter()
1226                    .find(|arrangement_key| arrangement_key == &&candidate_start_key)
1227                    .is_some();
1228                start_tuple = (
1229                    JoinInputCharacteristics::new(
1230                        is_unique,
1231                        candidate_start_key.len(),
1232                        arranged,
1233                        cardinality,
1234                        self.filters[start].clone(),
1235                        start,
1236                        self.enable_join_prioritize_arranged,
1237                    ),
1238                    candidate_start_key,
1239                    start,
1240                );
1241            } else {
1242                // For the second input's key fields, there is nothing else to equate it with but
1243                // the fields of the first input, so we should find a match for each of the fields.
1244                // (For a later input, different fields of a key might be equated with fields coming
1245                // from various inputs.)
1246                // Technically, this happens as follows:
1247                // The second input must have been placed in the `priority_queue` either
1248                // 1) as a cross join possibility, or
1249                // 2) when we called `order_input` on the starting input.
1250                // In the 1) case, `key.len()` is 0. In the 2) case, it was the very first call to
1251                // `order_input`, which means that `placed` was true only for the
1252                // starting input, which means that `fully_supported` was true due to
1253                // one of the expressions referring only to the starting input.
1254                let msg = "Unreachable state in join order optimization".to_string();
1255                return Err(TransformError::Internal(msg));
1256                // (This couldn't be a soft_panic: we would form an arrangement with a wrong key.)
1257            }
1258        }
1259        self.order.insert(0, start_tuple);
1260
1261        Ok(std::mem::replace(&mut self.order, Vec::new()))
1262    }
1263
1264    /// Introduces a specific input and keys to the order, along with its characteristics.
1265    ///
1266    /// This method places a next element in the order, and updates the associated state
1267    /// about other candidates, including which columns are now bound and which potential
1268    /// keys are available to consider (both arranged, and unarranged).
1269    fn order_input(&mut self, input: usize) {
1270        self.placed[input] = true;
1271        for (equivalence, expr_index) in self.reverse_equivalences[input].iter() {
1272            if !self.equivalences_active[*equivalence] {
1273                // Placing `input` *may* activate the equivalence. Each of its columns
1274                // come in to scope, which may result in an expression in `equivalence`
1275                // becoming fully defined (when its support is contained in placed inputs)
1276                let fully_supported = self
1277                    .input_mapper
1278                    .lookup_inputs(&self.equivalences[*equivalence][*expr_index])
1279                    .all(|i| self.placed[i]);
1280                if fully_supported {
1281                    self.equivalences_active[*equivalence] = true;
1282                    for expr in self.equivalences[*equivalence].iter() {
1283                        // find the relations that columns in the expression belong to
1284                        let mut rels = self.input_mapper.lookup_inputs(expr);
1285                        // Skip the expression if
1286                        // * the expression is a literal -> this would translate
1287                        //   to `rels` being empty
1288                        // * the expression has columns belonging to more than
1289                        //   one relation -> TODO: see how we can plan better in
1290                        //   this case. Arguably, if this happens, it would
1291                        //   not be unreasonable to ask the user to write the
1292                        //   query better.
1293                        if let Some(rel) = rels.next() {
1294                            if rels.next().is_none() {
1295                                let expr = self.input_mapper.map_expr_to_local(expr.clone());
1296
1297                                // Update bound columns.
1298                                self.bound[rel].push(expr);
1299                                self.bound[rel].sort();
1300
1301                                // Reconsider all available arrangements.
1302                                for (pos, key) in self.arrangements[rel].iter().enumerate() {
1303                                    if !self.arrangement_active[rel].contains(&pos) {
1304                                        // TODO: support the restoration of the
1305                                        // following original lines, which have been
1306                                        // commented out because Materialize may
1307                                        // panic otherwise. The original line and comments
1308                                        // here are:
1309                                        // Determine if the arrangement is viable, which happens when the
1310                                        // support of its key is all bound.
1311                                        // if key.iter().all(|k| k.support().iter().all(|c| self.bound[*rel].contains(&ScalarExpr::Column(*c))) {
1312
1313                                        // Determine if the arrangement is viable,
1314                                        // which happens when all its key components are bound.
1315                                        if key.iter().all(|k| self.bound[rel].contains(k)) {
1316                                            self.arrangement_active[rel].push(pos);
1317                                            // TODO: This could be pre-computed, as it is independent of the order.
1318                                            let is_unique = self.unique_arrangement[rel][pos];
1319                                            self.priority_queue.push((
1320                                                JoinInputCharacteristics::new(
1321                                                    is_unique,
1322                                                    key.len(),
1323                                                    true,
1324                                                    self.cardinalities[rel],
1325                                                    self.filters[rel].clone(),
1326                                                    rel,
1327                                                    self.enable_join_prioritize_arranged,
1328                                                ),
1329                                                key.clone(),
1330                                                rel,
1331                                            ));
1332                                        }
1333                                    }
1334                                }
1335
1336                                // does the relation we're joining on have a unique key wrt what's already bound?
1337                                let is_unique = self.unique_keys[rel].iter().any(|cols| {
1338                                    cols.iter().all(|c| {
1339                                        self.bound[rel].contains(&MirScalarExpr::column(*c))
1340                                    })
1341                                });
1342                                self.priority_queue.push((
1343                                    JoinInputCharacteristics::new(
1344                                        is_unique,
1345                                        self.bound[rel].len(),
1346                                        false,
1347                                        self.cardinalities[rel],
1348                                        self.filters[rel].clone(),
1349                                        rel,
1350                                        self.enable_join_prioritize_arranged,
1351                                    ),
1352                                    self.bound[rel].clone(),
1353                                    rel,
1354                                ));
1355                            }
1356                        }
1357                    }
1358                }
1359            }
1360        }
1361    }
1362}