mz_transform/
literal_lifting.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Hoist literal values from maps wherever possible.
11//!
12//! This transform specifically looks for `MirRelationExpr::Map` operators
13//! where any of the `ScalarExpr` expressions are literals. Whenever it
14//! can, it lifts those expressions through or around operators.
15//!
16//! The main feature of this operator is that it allows transformations
17//! to locally change the shape of operators, presenting fewer columns
18//! when they are unused and replacing them with mapped default values.
19//! The mapped default values can then be lifted and ideally absorbed.
20//! This type of transformation is difficult to make otherwise, as it
21//! is not easy to locally change the shape of relations.
22
23use std::collections::BTreeMap;
24
25use itertools::{Itertools, zip_eq};
26use mz_expr::JoinImplementation::IndexedFilter;
27use mz_expr::visit::Visit;
28use mz_expr::{Id, JoinInputMapper, MirRelationExpr, MirScalarExpr, RECURSION_LIMIT};
29use mz_ore::stack::{CheckedRecursion, RecursionGuard};
30use mz_repr::{Row, RowPacker};
31
32use crate::TransformCtx;
33
34/// Hoist literal values from maps wherever possible.
35#[derive(Debug)]
36pub struct LiteralLifting {
37    recursion_guard: RecursionGuard,
38}
39
40impl Default for LiteralLifting {
41    fn default() -> LiteralLifting {
42        LiteralLifting {
43            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
44        }
45    }
46}
47
48impl CheckedRecursion for LiteralLifting {
49    fn recursion_guard(&self) -> &RecursionGuard {
50        &self.recursion_guard
51    }
52}
53
54impl crate::Transform for LiteralLifting {
55    fn name(&self) -> &'static str {
56        "LiteralLifting"
57    }
58
59    #[mz_ore::instrument(
60        target = "optimizer",
61        level = "debug",
62        fields(path.segment = "literal_lifting")
63    )]
64    fn actually_perform_transform(
65        &self,
66        relation: &mut MirRelationExpr,
67        _: &mut TransformCtx,
68    ) -> Result<(), crate::TransformError> {
69        let literals = self.action(relation, &mut BTreeMap::new())?;
70        if !literals.is_empty() {
71            // Literals return up the root should be re-installed.
72            *relation = relation.take_dangerous().map(literals);
73        }
74        mz_repr::explain::trace_plan(&*relation);
75        Ok(())
76    }
77}
78
79impl LiteralLifting {
80    /// Hoist literal values from maps wherever possible.
81    ///
82    /// Returns a list of literal scalar expressions that must be appended
83    /// to the result before it can be correctly used. The intent is that
84    /// this action extracts a maximal set of literals from `relation`,
85    /// which can then often be propagated further up and inlined in any
86    /// expressions as it goes.
87    ///
88    /// In several cases, we only manage to extract literals from the final
89    /// columns. But in those cases where it is possible, permutations are
90    /// used to move all of the literals to the final columns, and then rely
91    /// on projection hoisting to allow the these literals to move up the AST.
92    ///
93    /// TODO: The literals from the final columns are returned as the result
94    /// of this method, whereas literals in intermediate columns are extracted
95    /// using permutations. The reason for this different treatment is that in
96    /// some cases it is not possible to remove the projection of the
97    /// permutation, preventing the lifting of a literal that could otherwise
98    /// be lifted, the following example being of them:
99    ///
100    /// %0 =
101    /// | Constant (1, 2, 3) (2, 2, 3)
102    ///
103    /// %1 =
104    /// | Constant (4, 3, 3) (4, 5, 3)
105    ///
106    /// %2 =
107    /// | Union %0 %1
108    ///
109    /// If final literals weren't treated differently, the example above would
110    /// lead to the following transformed plan:
111    ///
112    /// %0 =
113    /// | Constant (1) (2)
114    /// | Map 2, 3
115    /// | Project (#0..#2)
116    ///
117    /// %1 =
118    /// | Constant (3) (5)
119    /// | Map 4, 3
120    /// | Project (#1, #0, #2)
121    ///
122    /// %2 =
123    /// | Union %0 %1
124    ///
125    /// Since the union branches have different projections, they cannot be
126    /// removed, preventing literal 3 from being lifted further.
127    ///
128    /// In theory, all literals could be treated in the same way if this method
129    /// returned both a list of literals and a projection vector, making the
130    /// caller have to deal with the reshuffling.
131    /// (see <https://github.com/MaterializeInc/database-issues/issues/2055>)
132    ///
133    pub fn action(
134        &self,
135        relation: &mut MirRelationExpr,
136        // Map from names to literals required for appending.
137        gets: &mut BTreeMap<Id, Vec<MirScalarExpr>>,
138    ) -> Result<Vec<MirScalarExpr>, crate::TransformError> {
139        self.checked_recur(|_| {
140            match relation {
141                MirRelationExpr::Constant { rows, typ } => {
142                    // From the back to the front, check if all values are identical.
143                    let mut the_same = vec![true; typ.arity()];
144                    if let Ok([(row, _cnt), rows @ ..]) = rows.as_deref_mut() {
145                        let mut data = row.unpack();
146                        assert_eq!(the_same.len(), data.len());
147                        for (row, _cnt) in rows.iter() {
148                            let other = row.unpack();
149                            assert_eq!(the_same.len(), other.len());
150                            for index in 0..the_same.len() {
151                                the_same[index] = the_same[index] && (data[index] == other[index]);
152                            }
153                        }
154                        let mut literals = Vec::new();
155                        while the_same.last() == Some(&true) {
156                            the_same.pop();
157                            let datum = data.pop().unwrap();
158                            let typum = typ.column_types.pop().unwrap();
159                            literals.push(MirScalarExpr::literal_ok(datum, typum.scalar_type));
160                        }
161                        literals.reverse();
162
163                        // Any subset of constant values can be extracted with a permute.
164                        let remaining_common_literals = the_same.iter().filter(|e| **e).count();
165                        if remaining_common_literals > 0 {
166                            let final_arity = the_same.len() - remaining_common_literals;
167                            let mut projected_literals = Vec::new();
168                            let mut projection = Vec::new();
169                            let mut new_column_types = Vec::new();
170                            for (i, sameness) in the_same.iter().enumerate() {
171                                if *sameness {
172                                    projection.push(final_arity + projected_literals.len());
173                                    projected_literals.push(MirScalarExpr::literal_ok(
174                                        data[i],
175                                        typ.column_types[i].scalar_type.clone(),
176                                    ));
177                                } else {
178                                    projection.push(new_column_types.len());
179                                    new_column_types.push(typ.column_types[i].clone());
180                                }
181                            }
182                            typ.column_types = new_column_types;
183
184                            // Tidy up the type information of `relation`.
185                            for key in typ.keys.iter_mut() {
186                                *key = key
187                                    .iter()
188                                    .filter(|x| !the_same.get(**x).unwrap_or(&true))
189                                    .map(|x| projection[*x])
190                                    .collect::<Vec<usize>>();
191                            }
192                            typ.keys.sort();
193                            typ.keys.dedup();
194
195                            let remove_extracted_literals = |row: &mut Row| {
196                                let mut new_row = Row::default();
197                                let mut packer = new_row.packer();
198                                let data = row.unpack();
199                                for i in 0..the_same.len() {
200                                    if !the_same[i] {
201                                        packer.push(data[i]);
202                                    }
203                                }
204                                *row = new_row;
205                            };
206
207                            remove_extracted_literals(row);
208                            for (row, _cnt) in rows.iter_mut() {
209                                remove_extracted_literals(row);
210                            }
211
212                            *relation = relation
213                                .take_dangerous()
214                                .map(projected_literals)
215                                .project(projection);
216                        } else if !literals.is_empty() {
217                            // Tidy up the type information of `relation`.
218                            for key in typ.keys.iter_mut() {
219                                key.retain(|k| k < &data.len());
220                            }
221                            typ.keys.sort();
222                            typ.keys.dedup();
223
224                            RowPacker::for_existing_row(row).truncate_datums(typ.arity());
225                            for (row, _cnt) in rows.iter_mut() {
226                                RowPacker::for_existing_row(row).truncate_datums(typ.arity());
227                            }
228                        }
229
230                        Ok(literals)
231                    } else {
232                        Ok(Vec::new())
233                    }
234                }
235                MirRelationExpr::Get { id, typ, .. } => {
236                    // A get expression may need to have literal expressions appended to it.
237                    let literals = gets.get(id).cloned().unwrap_or_else(Vec::new);
238                    if !literals.is_empty() {
239                        // Correct the type of the `Get`, which has fewer columns,
240                        // and not the same fields in its keys. It is ok to remove
241                        // any columns from the keys, as them being literals meant
242                        // that their distinctness was not what made anything a key.
243                        for _ in 0..literals.len() {
244                            typ.column_types.pop();
245                        }
246                        let columns = typ.column_types.len();
247                        for key in typ.keys.iter_mut() {
248                            key.retain(|k| k < &columns);
249                        }
250                        typ.keys.sort();
251                        typ.keys.dedup();
252                    }
253                    Ok(literals)
254                }
255                MirRelationExpr::Let { id, value, body } => {
256                    // Any literals appended to the `value` should be used
257                    // at corresponding `Get`s throughout the `body`.
258                    let literals = self.action(value, gets)?;
259                    let id = Id::Local(*id);
260                    if !literals.is_empty() {
261                        let prior = gets.insert(id, literals);
262                        assert!(!prior.is_some());
263                    }
264                    let result = self.action(body, gets);
265                    gets.remove(&id);
266                    result
267                }
268                MirRelationExpr::LetRec {
269                    ids,
270                    values,
271                    limits: _,
272                    body,
273                } => {
274                    let recursive_ids = MirRelationExpr::recursive_ids(ids, values);
275
276                    // Extend the context with empty `literals` vectors for all
277                    // recursive IDs.
278                    for local_id in ids.iter() {
279                        if recursive_ids.contains(local_id) {
280                            let literals = vec![];
281                            let prior = gets.insert(Id::Local(*local_id), literals);
282                            assert!(!prior.is_some());
283                        }
284                    }
285
286                    // Descend into values and extend the context with their
287                    // `literals` results.
288                    for (local_id, value) in zip_eq(ids.iter(), values.iter_mut()) {
289                        let literals = self.action(value, gets)?;
290                        if recursive_ids.contains(local_id) {
291                            // Literals lifted from a recursive binding should
292                            // be re-installed at the top of the value.
293                            if !literals.is_empty() {
294                                *value = value.take_dangerous().map(literals);
295                            }
296                        } else {
297                            // Literals lifted from a non-recursive binding can
298                            // propagate to its call sites.
299                            let prior = gets.insert(Id::Local(*local_id), literals);
300                            assert!(!prior.is_some());
301                        }
302                    }
303
304                    // Descend into body.
305                    let result = self.action(body, gets)?;
306
307                    // Remove all enclosing IDs from the context before
308                    // returning the result.
309                    for id in ids.iter() {
310                        gets.remove(&Id::Local(*id));
311                    }
312
313                    Ok(result)
314                }
315                MirRelationExpr::Project { input, outputs } => {
316                    // We do not want to lift literals around projections.
317                    // Projections are the highest lifted operator and lifting
318                    // literals around projections could cause us to fail to
319                    // reach a fixed point under the transformations.
320                    let mut literals = self.action(input, gets)?;
321                    if !literals.is_empty() {
322                        let input_arity = input.arity();
323                        // For each input literal contains a vector with the `output` positions
324                        // that references it. By putting data into a Vec and sorting, we
325                        // guarantee a reliable order.
326                        let mut used_literals = outputs
327                            .iter()
328                            .enumerate()
329                            .filter(|(_, x)| **x >= input_arity)
330                            .map(|(out_col, old_in_col)| (old_in_col - input_arity, out_col))
331                            // group them to avoid adding duplicated literals
332                            .into_group_map()
333                            .drain()
334                            .collect::<Vec<_>>();
335
336                        if used_literals.len() != literals.len() {
337                            used_literals.sort();
338                            // Discard literals that are not projected
339                            literals = used_literals
340                                .iter()
341                                .map(|(old_in_col, _)| literals[*old_in_col].clone())
342                                .collect::<Vec<_>>();
343                            // Update the references to the literal in `output`
344                            for (new_in_col, (_old_in_col, out_cols)) in
345                                used_literals.iter().enumerate()
346                            {
347                                for out_col in out_cols {
348                                    outputs[*out_col] = input_arity + new_in_col;
349                                }
350                            }
351                        }
352
353                        // If the literals need to be re-interleaved,
354                        // we don't have much choice but to install a
355                        // Map operator to do that under the project.
356                        // Ideally this doesn't happen much, as projects
357                        // get lifted too.
358                        if !literals.is_empty() {
359                            **input = input.take_dangerous().map(literals);
360                        }
361                    }
362                    // Policy: Do not lift literals around projects.
363                    Ok(Vec::new())
364                }
365                MirRelationExpr::Map { input, scalars } => {
366                    let mut literals = self.action(input, gets)?;
367
368                    // Make the map properly formed again.
369                    literals.extend(scalars.iter().cloned());
370                    *scalars = literals;
371
372                    // Strip off literals at the end of `scalars`.
373                    let mut result = Vec::new();
374                    while scalars.last().map(|e| e.is_literal()) == Some(true) {
375                        result.push(scalars.pop().unwrap());
376                    }
377                    result.reverse();
378
379                    if scalars.is_empty() {
380                        *relation = input.take_dangerous();
381                    } else {
382                        // Permute columns to put literals at end, if any, hope project lifted.
383                        let literal_count = scalars.iter().filter(|e| e.is_literal()).count();
384                        if literal_count != 0 {
385                            let input_arity = input.arity();
386                            let first_literal_id = input_arity + scalars.len() - literal_count;
387                            let mut new_scalars = Vec::new();
388                            let mut projected_literals = Vec::new();
389                            let mut projection = (0..input_arity).collect::<Vec<usize>>();
390                            for scalar in scalars.iter_mut() {
391                                if scalar.is_literal() {
392                                    projection.push(first_literal_id + projected_literals.len());
393                                    projected_literals.push(scalar.clone());
394                                } else {
395                                    let mut cloned_scalar = scalar.clone();
396                                    // Propagate literals through expressions and remap columns.
397                                    cloned_scalar.visit_mut_post(&mut |e| {
398                                        if let MirScalarExpr::Column(old_id) = e {
399                                            let new_id = projection[*old_id];
400                                            if new_id >= first_literal_id {
401                                                *e = projected_literals[new_id - first_literal_id]
402                                                    .clone();
403                                            } else {
404                                                *old_id = new_id;
405                                            }
406                                        }
407                                    })?;
408                                    projection.push(input_arity + new_scalars.len());
409                                    new_scalars.push(cloned_scalar);
410                                }
411                            }
412                            new_scalars.extend(projected_literals);
413                            *relation = input.take_dangerous().map(new_scalars).project(projection);
414                        }
415                    }
416
417                    Ok(result)
418                }
419                MirRelationExpr::FlatMap { input, func, exprs } => {
420                    let literals = self.action(input, gets)?;
421                    if !literals.is_empty() {
422                        let input_arity = input.arity();
423                        for expr in exprs.iter_mut() {
424                            expr.visit_mut_post(&mut |e| {
425                                if let MirScalarExpr::Column(c) = e {
426                                    if *c >= input_arity {
427                                        *e = literals[*c - input_arity].clone();
428                                    }
429                                }
430                            })?;
431                        }
432                        // Permute the literals around the columns added by FlatMap
433                        let mut projection = (0..input_arity).collect::<Vec<usize>>();
434                        let func_arity = func.output_arity();
435                        projection
436                            .extend((0..literals.len()).map(|x| input_arity + func_arity + x));
437                        projection.extend((0..func_arity).map(|x| input_arity + x));
438
439                        *relation = relation.take_dangerous().map(literals).project(projection);
440                    }
441                    Ok(Vec::new())
442                }
443                MirRelationExpr::Filter { input, predicates } => {
444                    let literals = self.action(input, gets)?;
445                    if !literals.is_empty() {
446                        // We should be able to instantiate all uses of `literals`
447                        // in predicates and then lift the `map` around the filter.
448                        let input_arity = input.arity();
449                        for expr in predicates.iter_mut() {
450                            expr.visit_mut_post(&mut |e| {
451                                if let MirScalarExpr::Column(c) = e {
452                                    if *c >= input_arity {
453                                        *e = literals[*c - input_arity].clone();
454                                    }
455                                }
456                            })?;
457                        }
458                    }
459                    Ok(literals)
460                }
461                MirRelationExpr::Join {
462                    inputs,
463                    equivalences,
464                    implementation,
465                } => {
466                    if !matches!(implementation, IndexedFilter(..)) {
467                        // before lifting, save the original shape of the inputs
468                        let old_input_mapper = JoinInputMapper::new(inputs);
469
470                        // lift literals from each input
471                        let mut input_literals = Vec::new();
472                        for mut input in inputs.iter_mut() {
473                            let literals = self.action(input, gets)?;
474
475                            // Do not propagate error literals beyond join inputs, since that may result
476                            // in them being propagated to other inputs of the join and evaluated when
477                            // they should not.
478                            if literals.iter().any(|l| l.is_literal_err()) {
479                                // Push the literal errors beyond any arrangement since otherwise JoinImplementation
480                                // would add another arrangement on top leading to an infinite loop/stack overflow.
481                                if let MirRelationExpr::ArrangeBy { input, .. } = &mut input {
482                                    **input = input.take_dangerous().map(literals);
483                                } else {
484                                    *input = input.take_dangerous().map(literals);
485                                }
486                                input_literals.push(Vec::new());
487                            } else {
488                                input_literals.push(literals);
489                            }
490                        }
491
492                        if input_literals.iter().any(|l| !l.is_empty()) {
493                            *implementation = mz_expr::JoinImplementation::Unimplemented;
494
495                            // We should be able to install any literals in the
496                            // equivalence relations, and then lift all literals
497                            // around the join using a project to re-order columns.
498
499                            // Visit each expression in each equivalence class to either
500                            // inline literals or update column references.
501                            let new_input_mapper = JoinInputMapper::new(inputs);
502                            for equivalence in equivalences.iter_mut() {
503                                for expr in equivalence.iter_mut() {
504                                    expr.visit_mut_post(&mut |e| {
505                                        if let MirScalarExpr::Column(c) = e {
506                                            let (col, input) =
507                                                old_input_mapper.map_column_to_local(*c);
508                                            if col >= new_input_mapper.input_arity(input) {
509                                                // the column refers to a literal that
510                                                // has been promoted. inline it
511                                                *e = input_literals[input]
512                                                    [col - new_input_mapper.input_arity(input)]
513                                                .clone()
514                                            } else {
515                                                // localize to the new join
516                                                *c = new_input_mapper
517                                                    .map_column_to_global(col, input);
518                                            }
519                                        }
520                                    })?;
521                                }
522                            }
523
524                            // We now determine a projection to shovel around all of
525                            // the columns that puts the literals last. Where this is optional
526                            // for other operators, it is mandatory here if we want to lift the
527                            // literals through the join.
528
529                            // The first literal column number starts at the last column
530                            // of the new join. Increment the column number as literals
531                            // get added.
532                            let mut literal_column_number = new_input_mapper.total_columns();
533                            let mut projection = Vec::new();
534                            for input in 0..old_input_mapper.total_inputs() {
535                                for column in old_input_mapper.local_columns(input) {
536                                    if column >= new_input_mapper.input_arity(input) {
537                                        projection.push(literal_column_number);
538                                        literal_column_number += 1;
539                                    } else {
540                                        projection.push(
541                                            new_input_mapper.map_column_to_global(column, input),
542                                        );
543                                    }
544                                }
545                            }
546
547                            let literals = input_literals.into_iter().flatten().collect::<Vec<_>>();
548
549                            // Bubble up literals if the projection is the
550                            // identity.
551                            if projection.iter().enumerate().all(|(col, &pos)| col == pos) {
552                                return Ok(literals);
553                            }
554
555                            // Otherwise add map(literals) + project(projection)
556                            // and bubble up an empty literals vector.
557                            *relation = relation.take_dangerous().map(literals).project(projection);
558                        }
559                    }
560                    Ok(Vec::new())
561                }
562                MirRelationExpr::Reduce {
563                    input,
564                    group_key,
565                    aggregates,
566                    monotonic: _,
567                    expected_group_size: _,
568                } => {
569                    let literals = self.action(input, gets)?;
570                    if !literals.is_empty() {
571                        // Reduce absorbs maps, and we should inline literals.
572                        let input_arity = input.arity();
573                        // Inline literals into group key expressions.
574                        for expr in group_key.iter_mut() {
575                            expr.visit_mut_post(&mut |e| {
576                                if let MirScalarExpr::Column(c) = e {
577                                    if *c >= input_arity {
578                                        *e = literals[*c - input_arity].clone();
579                                    }
580                                }
581                            })?;
582                        }
583                        // Inline literals into aggregate value selector expressions.
584                        for aggr in aggregates.iter_mut() {
585                            aggr.expr.visit_mut_post(&mut |e| {
586                                if let MirScalarExpr::Column(c) = e {
587                                    if *c >= input_arity {
588                                        *e = literals[*c - input_arity].clone();
589                                    }
590                                }
591                            })?;
592                        }
593                    }
594
595                    let eval_constant_aggr = |aggr: &mz_expr::AggregateExpr| {
596                        let temp = mz_repr::RowArena::new();
597                        let mut eval = aggr.expr.eval(&[], &temp);
598                        if let Ok(param) = eval {
599                            eval = Ok(aggr.func.eval(Some(param), &temp));
600                        }
601                        MirScalarExpr::literal(
602                            eval,
603                            // This type information should be available in the `a.expr` literal,
604                            // but extracting it with pattern matching seems awkward.
605                            aggr.func.output_type(aggr.expr.typ(&[])).scalar_type,
606                        )
607                    };
608
609                    // The only literals we think we can lift are those that are
610                    // independent of the number of records; things like `Any`, `All`,
611                    // `Min`, and `Max`.
612                    let mut result = Vec::new();
613                    while aggregates.last().map(|a| a.is_constant()) == Some(true) {
614                        let aggr = aggregates.pop().unwrap();
615                        result.push(eval_constant_aggr(&aggr));
616                    }
617                    if aggregates.is_empty() {
618                        while group_key.last().map(|k| k.is_literal()) == Some(true) {
619                            let key = group_key.pop().unwrap();
620                            result.push(key);
621                        }
622                    }
623                    result.reverse();
624
625                    // Add a Map operator with the remaining literals so that they are lifted in
626                    // the next invocation of this transform.
627                    let non_literal_keys = group_key.iter().filter(|x| !x.is_literal()).count();
628                    let non_constant_aggr = aggregates.iter().filter(|x| !x.is_constant()).count();
629                    if non_literal_keys != group_key.len() || non_constant_aggr != aggregates.len()
630                    {
631                        let first_projected_literal: usize = non_literal_keys + non_constant_aggr;
632                        let mut projection = Vec::new();
633                        let mut projected_literals = Vec::new();
634
635                        let mut new_group_key = Vec::new();
636                        for key in group_key.drain(..) {
637                            if key.is_literal() {
638                                projection.push(first_projected_literal + projected_literals.len());
639                                projected_literals.push(key);
640                            } else {
641                                projection.push(new_group_key.len());
642                                new_group_key.push(key);
643                            }
644                        }
645                        // The new group key without literals
646                        *group_key = new_group_key;
647
648                        let mut new_aggregates = Vec::new();
649                        for aggr in aggregates.drain(..) {
650                            if aggr.is_constant() {
651                                projection.push(first_projected_literal + projected_literals.len());
652                                projected_literals.push(eval_constant_aggr(&aggr));
653                            } else {
654                                projection.push(group_key.len() + new_aggregates.len());
655                                new_aggregates.push(aggr);
656                            }
657                        }
658                        // The new aggregates without constant ones
659                        *aggregates = new_aggregates;
660
661                        *relation = relation
662                            .take_dangerous()
663                            .map(projected_literals)
664                            .project(projection);
665                    }
666                    Ok(result)
667                }
668                MirRelationExpr::TopK {
669                    input,
670                    group_key,
671                    order_key,
672                    limit,
673                    offset: _,
674                    monotonic: _,
675                    expected_group_size: _,
676                } => {
677                    let literals = self.action(input, gets)?;
678                    if !literals.is_empty() {
679                        // We should be able to lift literals out, as they affect neither
680                        // grouping nor ordering. We should discard grouping and ordering
681                        // that references the columns, though.
682                        let input_arity = input.arity();
683                        group_key.retain(|c| *c < input_arity);
684                        order_key.retain(|o| o.column < input_arity);
685                        // Inline literals into the limit expression.
686                        if let Some(limit) = limit {
687                            limit.visit_mut_post(&mut |e| {
688                                if let MirScalarExpr::Column(c) = e {
689                                    if *c >= input_arity {
690                                        *e = literals[*c - input_arity].clone();
691                                    }
692                                }
693                            })?;
694                        }
695                    }
696                    Ok(literals)
697                }
698                MirRelationExpr::Negate { input } => {
699                    // Literals can just be lifted out of negate.
700                    self.action(input, gets)
701                }
702                MirRelationExpr::Threshold { input } => {
703                    // Literals can just be lifted out of threshold.
704                    self.action(input, gets)
705                }
706                MirRelationExpr::Union { base, inputs } => {
707                    let mut base_literals = self.action(base, gets)?;
708
709                    let mut input_literals = vec![];
710                    for input in inputs.iter_mut() {
711                        input_literals.push(self.action(input, gets)?)
712                    }
713
714                    // We need to find the longest common suffix between all the arms of the union.
715                    let mut suffix = Vec::new();
716                    while !base_literals.is_empty()
717                        && input_literals
718                            .iter()
719                            .all(|lits| lits.last() == base_literals.last())
720                    {
721                        // Every arm agrees on the last value, so push it onto the shared suffix and
722                        // remove it from each arm.
723                        suffix.push(base_literals.last().unwrap().clone());
724                        base_literals.pop();
725                        for lits in input_literals.iter_mut() {
726                            lits.pop();
727                        }
728                    }
729
730                    // Because we pushed stuff onto the vector like a stack, we need to reverse it now.
731                    suffix.reverse();
732
733                    // Any remaining literals for each expression must be appended to that expression,
734                    // while the shared suffix is returned to continue traveling upwards.
735                    if !base_literals.is_empty() {
736                        **base = base.take_dangerous().map(base_literals);
737                    }
738                    for (input, literals) in inputs.iter_mut().zip_eq(input_literals) {
739                        if !literals.is_empty() {
740                            *input = input.take_dangerous().map(literals);
741                        }
742                    }
743                    Ok(suffix)
744                }
745                MirRelationExpr::ArrangeBy { input, keys } => {
746                    // TODO(frank): Not sure if this is the right behavior,
747                    // as we disrupt the set of used arrangements. Though,
748                    // we are probably most likely to use arranged `Get`
749                    // operators rather than those decorated with maps.
750                    let literals = self.action(input, gets)?;
751                    if !literals.is_empty() {
752                        let input_arity = input.arity();
753                        for key in keys.iter_mut() {
754                            for expr in key.iter_mut() {
755                                expr.visit_mut_post(&mut |e| {
756                                    if let MirScalarExpr::Column(c) = e {
757                                        if *c >= input_arity {
758                                            *e = literals[*c - input_arity].clone();
759                                        }
760                                    }
761                                })?;
762                            }
763                        }
764                    }
765                    Ok(literals)
766                }
767            }
768        })
769    }
770}