Skip to main content

mz_expr/
linear.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::scalar::optimizable::OptimizableExpr;
16use crate::visit::Visit;
17use crate::{MirRelationExpr, MirScalarExpr};
18
19/// A compound operator that can be applied row-by-row.
20///
21/// This operator integrates the map, filter, and project operators.
22/// It applies a sequences of map expressions, which are allowed to
23/// refer to previous expressions, interleaved with predicates which
24/// must be satisfied for an output to be produced. If all predicates
25/// evaluate to `Datum::True` the data at the identified columns are
26/// collected and produced as output in a packed `Row`.
27///
28/// This operator is a "builder" and its contents may contain expressions
29/// that are not yet executable. For example, it may contain temporal
30/// expressions in `self.expressions`, even though this is not something
31/// we can directly evaluate. The plan creation methods will defensively
32/// ensure that the right thing happens.
33#[derive(
34    Clone,
35    Debug,
36    Eq,
37    PartialEq,
38    Serialize,
39    Deserialize,
40    Hash,
41    Ord,
42    PartialOrd
43)]
44#[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
45pub struct MapFilterProject<E: OptimizableExpr = MirScalarExpr> {
46    /// A sequence of expressions that should be appended to the row.
47    ///
48    /// Many of these expressions may not be produced in the output,
49    /// and may only be present as common subexpressions.
50    pub expressions: Vec<E>,
51    /// Expressions that must evaluate to `Datum::True` for the output
52    /// row to be produced.
53    ///
54    /// Each entry is prepended with a column identifier indicating
55    /// the column *before* which the predicate should first be applied.
56    /// Most commonly this would be one plus the largest column identifier
57    /// in the predicate's support, but it could be larger to implement
58    /// guarded evaluation of predicates.
59    ///
60    /// This list should be sorted by the first field.
61    pub predicates: Vec<(usize, E)>,
62    /// A sequence of column identifiers whose data form the output row.
63    pub projection: Vec<usize>,
64    /// The expected number of input columns.
65    ///
66    /// This is needed to ensure correct identification of newly formed
67    /// columns in the output.
68    pub input_arity: usize,
69}
70
71impl<E: OptimizableExpr + Display> Display for MapFilterProject<E> {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        writeln!(f, "MapFilterProject(")?;
74        writeln!(f, "  expressions:")?;
75        self.expressions
76            .iter()
77            .enumerate()
78            .try_for_each(|(i, e)| writeln!(f, "    #{} <- {},", i + self.input_arity, e))?;
79        writeln!(f, "  predicates:")?;
80        self.predicates
81            .iter()
82            .try_for_each(|(before, p)| writeln!(f, "    <before: {}> {},", before, p))?;
83        writeln!(f, "  projection: {:?}", self.projection)?;
84        writeln!(f, "  input_arity: {}", self.input_arity)?;
85        writeln!(f, ")")
86    }
87}
88
89impl<E: OptimizableExpr> MapFilterProject<E> {
90    /// Create a no-op operator for an input of a supplied arity.
91    pub fn new(input_arity: usize) -> Self {
92        Self {
93            expressions: Vec::new(),
94            predicates: Vec::new(),
95            projection: (0..input_arity).collect(),
96            input_arity,
97        }
98    }
99
100    /// Given two mfps, return an mfp that applies one
101    /// followed by the other.
102    /// Note that the arguments are in the opposite order
103    /// from how function composition is usually written in mathematics.
104    pub fn compose(before: Self, after: Self) -> Self {
105        let (m, f, p) = after.into_map_filter_project();
106        before.map(m).filter(f).project(p)
107    }
108
109    /// True if the operator describes the identity transformation.
110    pub fn is_identity(&self) -> bool {
111        self.expressions.is_empty()
112            && self.predicates.is_empty()
113            && self.projection.len() == self.input_arity
114            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
115    }
116
117    /// Retain only the indicated columns in the presented order.
118    pub fn project<I>(mut self, columns: I) -> Self
119    where
120        I: IntoIterator<Item = usize> + std::fmt::Debug,
121    {
122        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
123        self
124    }
125
126    /// Retain only rows satisfying these predicates.
127    ///
128    /// This method introduces predicates as eagerly as they can be evaluated,
129    /// which may not be desired for predicates that may cause exceptions.
130    /// If fine manipulation is required, the predicates can be added manually.
131    pub fn filter<I>(mut self, predicates: I) -> Self
132    where
133        I: IntoIterator<Item = E>,
134    {
135        for mut predicate in predicates {
136            // Correct column references.
137            predicate.permute(&self.projection[..]);
138
139            // Validate column references.
140            assert!(
141                predicate
142                    .support()
143                    .into_iter()
144                    .all(|c| c < self.input_arity + self.expressions.len())
145            );
146
147            // Insert predicate as eagerly as it can be evaluated:
148            // just after the largest column in its support is formed.
149            let max_support = predicate
150                .support()
151                .into_iter()
152                .max()
153                .map(|c| c + 1)
154                .unwrap_or(0);
155            self.predicates.push((max_support, predicate))
156        }
157        // Stable sort predicates by position at which they take effect.
158        // We put literal errors at the end as a stop-gap to avoid erroring
159        // before we are able to evaluate any predicates that might prevent it.
160        self.predicates
161            .sort_by_key(|(position, predicate)| (E::is_literal_err(predicate), *position));
162        self
163    }
164
165    /// Append the result of evaluating expressions to each row.
166    pub fn map<I>(mut self, expressions: I) -> Self
167    where
168        I: IntoIterator<Item = E>,
169    {
170        for mut expression in expressions {
171            // Correct column references.
172            expression.permute(&self.projection[..]);
173
174            // Validate column references.
175            assert!(
176                expression
177                    .support()
178                    .into_iter()
179                    .all(|c| c < self.input_arity + self.expressions.len())
180            );
181
182            // Introduce expression and produce as output.
183            self.expressions.push(expression);
184            self.projection
185                .push(self.input_arity + self.expressions.len() - 1);
186        }
187
188        self
189    }
190
191    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
192    pub fn into_map_filter_project(self) -> (Vec<E>, Vec<E>, Vec<usize>) {
193        let predicates = self
194            .predicates
195            .into_iter()
196            .map(|(_pos, predicate)| predicate)
197            .collect();
198        (self.expressions, predicates, self.projection)
199    }
200
201    /// As the arguments to `Map`, `Filter`, and `Project` operators.
202    ///
203    /// In principle, this operator can be implemented as a sequence of
204    /// more elemental operators, likely less efficiently.
205    pub fn as_map_filter_project(&self) -> (Vec<E>, Vec<E>, Vec<usize>) {
206        self.clone().into_map_filter_project()
207    }
208}
209
210// Methods that are specific to MirScalarExpr (use MirRelationExpr or as_literal).
211impl MapFilterProject<MirScalarExpr> {
212    /// Determines if a scalar expression must be equal to a literal datum.
213    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
214        for (_pos, predicate) in self.predicates.iter() {
215            if let MirScalarExpr::CallBinary {
216                func: crate::BinaryFunc::Eq(_),
217                expr1,
218                expr2,
219            } = predicate
220            {
221                if let Some(Ok(datum1)) = expr1.as_literal() {
222                    if &**expr2 == expr {
223                        return Some(datum1);
224                    }
225                }
226                if let Some(Ok(datum2)) = expr2.as_literal() {
227                    if &**expr1 == expr {
228                        return Some(datum2);
229                    }
230                }
231            }
232        }
233        None
234    }
235
236    /// Determines if a sequence of scalar expressions must be equal to a literal row.
237    ///
238    /// This method returns `None` on an empty `exprs`, which might be surprising, but
239    /// seems to line up with its callers' expectations of that being a non-constraint.
240    /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
241    /// if they would rather have a literal empty row.
242    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
243        if exprs.is_empty() {
244            return None;
245        }
246        let mut row = Row::default();
247        let mut packer = row.packer();
248        for expr in exprs {
249            if let Some(literal) = self.literal_constraint(expr) {
250                packer.push(literal);
251            } else {
252                return None;
253            }
254        }
255        Some(row)
256    }
257
258    /// Extracts any MapFilterProject at the root of the expression.
259    ///
260    /// The expression will be modified to extract any maps, filters, and
261    /// projections, which will be returned as `Self`. If there are no maps,
262    /// filters, or projections the method will return an identity operator.
263    ///
264    /// The extracted expressions may contain temporal predicates, and one
265    /// should be careful to apply them blindly.
266    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
267        // TODO: This could become iterative rather than recursive if
268        // we were able to fuse MFP operators from below, rather than
269        // from above.
270        match expr {
271            MirRelationExpr::Map { input, scalars } => {
272                let (mfp, expr) = Self::extract_from_expression(input);
273                (mfp.map(scalars.iter().cloned()), expr)
274            }
275            MirRelationExpr::Filter { input, predicates } => {
276                let (mfp, expr) = Self::extract_from_expression(input);
277                (mfp.filter(predicates.iter().cloned()), expr)
278            }
279            MirRelationExpr::Project { input, outputs } => {
280                let (mfp, expr) = Self::extract_from_expression(input);
281                (mfp.project(outputs.iter().cloned()), expr)
282            }
283            // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
284            // this call to `arity()`.
285            x => (Self::new(x.arity()), x),
286        }
287    }
288
289    /// Extracts an error-free MapFilterProject at the root of the expression.
290    ///
291    /// The expression will be modified to extract maps, filters, and projects
292    /// from the root of the expression, which will be returned as `Self`. The
293    /// extraction will halt if a Map or Filter containing a literal error is
294    /// reached. Otherwise, the method will return an identity operator.
295    ///
296    /// This method is meant to be used during optimization, where it is
297    /// necessary to avoid moving around maps and filters with errors.
298    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
299        match expr {
300            MirRelationExpr::Map { input, scalars }
301                if scalars.iter().all(|s| !s.is_literal_err()) =>
302            {
303                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
304                (mfp.map(scalars.iter().cloned()), expr)
305            }
306            MirRelationExpr::Filter { input, predicates }
307                if predicates.iter().all(|p| !p.is_literal_err()) =>
308            {
309                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
310                (mfp.filter(predicates.iter().cloned()), expr)
311            }
312            MirRelationExpr::Project { input, outputs } => {
313                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
314                (mfp.project(outputs.iter().cloned()), expr)
315            }
316            x => (Self::new(x.arity()), x),
317        }
318    }
319
320    /// Extracts an error-free MapFilterProject at the root of the expression.
321    ///
322    /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
323    /// mutable reference.
324    pub fn extract_non_errors_from_expr_ref_mut(
325        expr: &mut MirRelationExpr,
326    ) -> (Self, &mut MirRelationExpr) {
327        // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
328        // superfluous outer if, which works around a borrow-checker issue:
329        // https://github.com/rust-lang/rust/issues/54663
330        if matches!(
331            expr,
332            MirRelationExpr::Map { input: _, scalars }
333                if scalars.iter().all(|s| !s.is_literal_err())
334        ) || matches!(
335            expr,
336            MirRelationExpr::Filter { input: _, predicates }
337                if predicates.iter().all(|p| !p.is_literal_err())
338        ) || matches!(expr, MirRelationExpr::Project { .. })
339        {
340            match expr {
341                MirRelationExpr::Map { input, scalars }
342                    if scalars.iter().all(|s| !s.is_literal_err()) =>
343                {
344                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
345                    (mfp.map(scalars.iter().cloned()), expr)
346                }
347                MirRelationExpr::Filter { input, predicates }
348                    if predicates.iter().all(|p| !p.is_literal_err()) =>
349                {
350                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
351                    (mfp.filter(predicates.iter().cloned()), expr)
352                }
353                MirRelationExpr::Project { input, outputs } => {
354                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
355                    (mfp.project(outputs.iter().cloned()), expr)
356                }
357                _ => unreachable!(),
358            }
359        } else {
360            (Self::new(expr.arity()), expr)
361        }
362    }
363
364    /// Removes an error-free MapFilterProject from the root of the expression.
365    ///
366    /// The expression will be modified to extract maps, filters, and projects
367    /// from the root of the expression, which will be returned as `Self`. The
368    /// extraction will halt if a Map or Filter containing a literal error is
369    /// reached. Otherwise, the method will return an
370    /// identity operator, and the expression will remain unchanged.
371    ///
372    /// This method is meant to be used during optimization, where it is
373    /// necessary to avoid moving around maps and filters with errors.
374    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
375        match expr {
376            MirRelationExpr::Map { input, scalars }
377                if scalars.iter().all(|s| !s.is_literal_err()) =>
378            {
379                let mfp =
380                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
381                *expr = input.take_dangerous();
382                mfp
383            }
384            MirRelationExpr::Filter { input, predicates }
385                if predicates.iter().all(|p| !p.is_literal_err()) =>
386            {
387                let mfp = Self::extract_non_errors_from_expr_mut(input)
388                    .filter(predicates.iter().cloned());
389                *expr = input.take_dangerous();
390                mfp
391            }
392            MirRelationExpr::Project { input, outputs } => {
393                let mfp =
394                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
395                *expr = input.take_dangerous();
396                mfp
397            }
398            x => Self::new(x.arity()),
399        }
400    }
401}
402
403impl<E: OptimizableExpr> MapFilterProject<E> {
404    /// Returns `true` if any predicate in this MFP contains a temporal expression (`mz_now()`).
405    pub fn has_temporal_predicates(&self) -> bool {
406        self.predicates
407            .iter()
408            .any(|(_, predicate)| OptimizableExpr::contains_temporal(predicate))
409    }
410
411    /// Extracts temporal predicates into their own `Self`.
412    ///
413    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
414    /// though there could be justification for extracting them as well if they are otherwise
415    /// unused.
416    ///
417    /// This separation is valuable when the execution cannot be fused into one operator.
418    pub fn extract_temporal(&mut self) -> Self {
419        // Optimize the expression, as it is only post-optimization that we can be certain
420        // that temporal expressions are restricted to filters. We could relax this in the
421        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
422        // seems to be the best fit at the moment.
423        self.optimize();
424
425        // Assert that we no longer have temporal expressions to evaluate. This should only
426        // occur if the optimization above results with temporal expressions yielded in the
427        // output, which is out of spec for how the type is meant to be used.
428        assert!(
429            !self
430                .expressions
431                .iter()
432                .any(|e| OptimizableExpr::contains_temporal(e))
433        );
434
435        // Extract temporal predicates from `self.predicates`.
436        let mut temporal_predicates = Vec::new();
437        self.predicates.retain(|(_position, predicate)| {
438            if OptimizableExpr::contains_temporal(predicate) {
439                temporal_predicates.push(predicate.clone());
440                false
441            } else {
442                true
443            }
444        });
445
446        // Determine extended input columns used by temporal filters.
447        let mut support = BTreeSet::new();
448        for predicate in temporal_predicates.iter() {
449            support.extend(predicate.support());
450        }
451
452        // Discover the locations of these columns after `self.projection`.
453        let old_projection_len = self.projection.len();
454        let mut new_location = BTreeMap::new();
455        for original in support.iter() {
456            if let Some(position) = self.projection.iter().position(|x| x == original) {
457                new_location.insert(*original, position);
458            } else {
459                new_location.insert(*original, self.projection.len());
460                self.projection.push(*original);
461            }
462        }
463        // Permute references in extracted predicates to their new locations.
464        for predicate in temporal_predicates.iter_mut() {
465            predicate.permute_map(&new_location);
466        }
467
468        // Form a new `Self` containing the temporal predicates to return.
469        Self::new(self.projection.len())
470            .filter(temporal_predicates)
471            .project(0..old_projection_len)
472    }
473
474    /// Extracts common expressions from multiple `Self` into a result `Self`.
475    ///
476    /// The argument `mfps` are mutated so that each are functionaly equivalent to their
477    /// corresponding input, when composed atop the resulting `Self`.
478    ///
479    /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
480    pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
481        match mfps.len() {
482            0 => {
483                panic!("Cannot call method on empty arguments");
484            }
485            1 => {
486                let output_arity = mfps[0].projection.len();
487                std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
488            }
489            _ => {
490                // More generally, we convert each mfp to ANF, at which point we can
491                // repeatedly extract atomic expressions that depend only on input
492                // columns, migrate them to an input mfp, and repeat until no such
493                // expressions exist. At this point, we can also migrate predicates
494                // and then determine and push down projections.
495
496                // Prepare a return `Self`.
497                let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
498
499                // We convert each mfp to ANF, using `memoize_expressions`.
500                for mfp in mfps.iter_mut() {
501                    mfp.memoize_expressions();
502                }
503
504                // We repeatedly extract common expressions, until none remain.
505                let mut done = false;
506                while !done {
507                    // We use references to determine common expressions, and must
508                    // introduce a scope here to drop the borrows before mutation.
509                    let common = {
510                        // The input arity may increase as we iterate, so recapture.
511                        let input_arity = result_mfp.projection.len();
512                        let mut prev: BTreeSet<_> = mfps[0]
513                            .expressions
514                            .iter()
515                            .filter(|e| e.support().last() < Some(&input_arity))
516                            .collect();
517                        let mut next = BTreeSet::default();
518                        for mfp in mfps[1..].iter() {
519                            for expr in mfp.expressions.iter() {
520                                if prev.contains(expr) {
521                                    next.insert(expr);
522                                }
523                            }
524                            std::mem::swap(&mut prev, &mut next);
525                            next.clear();
526                        }
527                        prev.into_iter().cloned().collect::<Vec<_>>()
528                    };
529                    // Without new common expressions, we should terminate the loop.
530                    done = common.is_empty();
531
532                    // Migrate each expression in `common` to `result_mfp`.
533                    for expr in common.into_iter() {
534                        // Update each mfp by removing expr and updating column references.
535                        for mfp in mfps.iter_mut() {
536                            // With `expr` next in `result_mfp`, it is as if we are rotating it to
537                            // be the first expression in `mfp`, and then removing it from `mfp` and
538                            // increasing the input arity of `mfp`.
539                            let arity = result_mfp.projection.len();
540                            let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
541                            let index = arity + found;
542                            // Column references change due to the rotation from `index` to `arity`.
543                            let action = |c: &mut usize| {
544                                if arity <= *c && *c < index {
545                                    *c += 1;
546                                } else if *c == index {
547                                    *c = arity;
548                                }
549                            };
550                            // Rotate `expr` from `found` to first, and then snip.
551                            // Short circuit by simply removing and incrementing the input arity.
552                            mfp.input_arity += 1;
553                            mfp.expressions.remove(found);
554                            // Update column references in expressions, predicates, and projections.
555                            for e in mfp.expressions.iter_mut() {
556                                e.visit_columns(action);
557                            }
558                            for (o, e) in mfp.predicates.iter_mut() {
559                                e.visit_columns(action);
560                                // Max out the offset for the predicate; optimization will correct.
561                                *o = mfp.input_arity + mfp.expressions.len();
562                            }
563                            for c in mfp.projection.iter_mut() {
564                                action(c);
565                            }
566                        }
567                        // Install the expression and update
568                        result_mfp.expressions.push(expr);
569                        result_mfp.projection.push(result_mfp.projection.len());
570                    }
571                }
572                // As before, but easier: predicates in common to all mfps.
573                let common_preds: Vec<E> = {
574                    let input_arity = result_mfp.projection.len();
575                    let mut prev: BTreeSet<_> = mfps[0]
576                        .predicates
577                        .iter()
578                        .map(|(_, e)| e)
579                        .filter(|e| e.support().last() < Some(&input_arity))
580                        .collect();
581                    let mut next = BTreeSet::default();
582                    for mfp in mfps[1..].iter() {
583                        for (_, expr) in mfp.predicates.iter() {
584                            if prev.contains(expr) {
585                                next.insert(expr);
586                            }
587                        }
588                        std::mem::swap(&mut prev, &mut next);
589                        next.clear();
590                    }
591                    // Expressions in common, that we will append to `result_mfp.expressions`.
592                    prev.into_iter().cloned().collect::<Vec<_>>()
593                };
594                for mfp in mfps.iter_mut() {
595                    mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
596                    mfp.optimize();
597                }
598                result_mfp.predicates.extend(
599                    common_preds
600                        .into_iter()
601                        .map(|e| (result_mfp.projection.len(), e)),
602                );
603
604                // Then, look for unused columns and project them away.
605                let mut common_demand = BTreeSet::new();
606                for mfp in mfps.iter() {
607                    common_demand.extend(mfp.demand());
608                }
609                // columns in `common_demand` must be retained, but others
610                // may be discarded.
611                let common_demand = (0..result_mfp.projection.len())
612                    .filter(|x| common_demand.contains(x))
613                    .collect::<Vec<_>>();
614                let remap = common_demand
615                    .iter()
616                    .cloned()
617                    .enumerate()
618                    .map(|(new, old)| (old, new))
619                    .collect::<BTreeMap<_, _>>();
620                for mfp in mfps.iter_mut() {
621                    mfp.permute_fn(|c| remap[&c], common_demand.len());
622                }
623                result_mfp = result_mfp.project(common_demand);
624
625                // Return the resulting MFP.
626                result_mfp.optimize();
627                result_mfp
628            }
629        }
630    }
631
632    /// Returns `self`, and leaves behind an identity operator that acts on its output.
633    pub fn take(&mut self) -> Self {
634        let mut identity = Self::new(self.projection.len());
635        std::mem::swap(self, &mut identity);
636        identity
637    }
638
639    /// Convert the `MapFilterProject` into a staged evaluation plan.
640    ///
641    /// The main behavior is extract temporal predicates, which cannot be evaluated
642    /// using the standard machinery.
643    pub fn into_plan(self) -> Result<plan::MfpPlan<E>, String> {
644        plan::MfpPlan::create_from(self)
645    }
646}
647
648impl<E: OptimizableExpr> MapFilterProject<E> {
649    /// Partitions `self` into two instances, one of which can be eagerly applied.
650    ///
651    /// The `available` argument indicates which input columns are available (keys)
652    /// and in which positions (values). This information may allow some maps and
653    /// filters to execute. The `input_arity` argument reports the total number of
654    /// input columns (which may include some not present in `available`)
655    ///
656    /// This method partitions `self` in two parts, `(before, after)`, where `before`
657    /// can be applied on columns present as keys in `available`, and `after` must
658    /// await the introduction of the other input columns.
659    ///
660    /// The `before` instance will *append* any columns that can be determined from
661    /// `available` but will project away any of these columns that are not needed by
662    /// `after`. Importantly, this means that `before` will leave intact *all* input
663    /// columns including those not referenced in `available`.
664    ///
665    /// The `after` instance will presume all input columns are available, followed
666    /// by the appended columns of the `before` instance. It may be that some input
667    /// columns can be projected away in `before` if `after` does not need them, but
668    /// we leave that as something the caller can apply if needed (it is otherwise
669    /// complicated to negotiate which input columns `before` should retain).
670    ///
671    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
672    /// additional input columns, permute all input columns to their locations as
673    /// expected by `self`, follow this by new columns appended by `before`, and
674    /// remove all other columns that may be present.
675    ///
676    /// # Example
677    ///
678    /// ```rust
679    /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
680    ///
681    /// // imagine an action on columns (a, b, c, d).
682    /// let original = MapFilterProject::new(4).map(vec![
683    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
684    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
685    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
686    /// ]).project(vec![6]);
687    ///
688    /// // Imagine we start with columns (b, x, a, y, c).
689    /// //
690    /// // The `partition` method requires a map from *expected* input columns to *actual*
691    /// // input columns. In the example above, the columns a, b, and c exist, and are at
692    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
693    /// let mut available_columns = std::collections::BTreeMap::new();
694    /// available_columns.insert(0, 2);
695    /// available_columns.insert(1, 0);
696    /// available_columns.insert(2, 4);
697    /// // Partition `original` using the available columns and current input arity.
698    /// // This informs `partition` which columns are available, where they can be found,
699    /// // and how many columns are not relevant but should be preserved.
700    /// let (before, after) = original.partition(available_columns, 5);
701    ///
702    /// // `before` sees all five input columns, and should append `a + b + c`.
703    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
704    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
705    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
706    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
707    ///
708    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
709    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
710    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
711    /// ]).project(vec![5]));
712    ///
713    /// // To reconstruct `self`, we must introduce the columns that are not present,
714    /// // and present them in the order intended by `self`. In this example, we must
715    /// // introduce column d and permute the columns so that they begin (a, b, c, d).
716    /// // The columns x and y must be projected away, and any columns introduced by
717    /// // `begin` must be retained in their current order.
718    ///
719    /// // The `after` instance expects to be provided with all inputs, but it
720    /// // may not need all inputs. The `demand()` and `permute()` methods can
721    /// // optimize the representation.
722    /// ```
723    pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
724        // Map expressions, filter predicates, and projections for `before` and `after`.
725        let mut before_expr = Vec::new();
726        let mut before_pred = Vec::new();
727        let mut before_proj = Vec::new();
728        let mut after_expr = Vec::new();
729        let mut after_pred = Vec::new();
730        let mut after_proj = Vec::new();
731
732        // Track which output columns must be preserved in the output of `before`.
733        let mut demanded = BTreeSet::new();
734        demanded.extend(0..self.input_arity);
735        demanded.extend(self.projection.iter());
736
737        // Determine which map expressions can be computed from the available subset.
738        // Some expressions may depend on other expressions, but by evaluating them
739        // in forward order we should accurately determine the available expressions.
740        let mut available_expr = vec![false; self.input_arity];
741        // Initialize available columns from `available`, which is then not used again.
742        for index in available.keys() {
743            available_expr[*index] = true;
744        }
745        for expr in self.expressions.into_iter() {
746            // We treat an expression as available if its supporting columns are available,
747            // and if it is not a literal (we want to avoid pushing down literals). This
748            // choice is ad-hoc, but the intent is that we partition the operators so
749            // that we can reduce the row representation size and total computation.
750            // Pushing down literals harms the former and does nothing for the latter.
751            // In the future, we'll want to have a harder think about this trade-off, as
752            // we are certainly making sub-optimal decisions by pushing down all available
753            // work.
754            // TODO(mcsherry): establish better principles about what work to push down.
755            let is_available = expr.support().into_iter().all(|i| available_expr[i])
756                && !OptimizableExpr::is_literal(&expr);
757            if is_available {
758                before_expr.push(expr);
759            } else {
760                demanded.extend(expr.support());
761                after_expr.push(expr);
762            }
763            available_expr.push(is_available);
764        }
765
766        // Determine which predicates can be computed from the available subset.
767        for (_when, pred) in self.predicates.into_iter() {
768            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
769            if is_available {
770                before_pred.push(pred);
771            } else {
772                demanded.extend(pred.support());
773                after_pred.push(pred);
774            }
775        }
776
777        // Map from prior output location to location in un-projected `before`.
778        // This map is used to correct references in `before` but it should be
779        // adjusted to reflect `before`s projection prior to use in `after`.
780        let mut before_map = available;
781        // Input columns include any additional undescribed columns that may
782        // not be captured by the `available` argument, so we must independently
783        // track the current number of columns (vs relying on `before_map.len()`).
784        let mut input_columns = input_arity;
785        for index in self.input_arity..available_expr.len() {
786            if available_expr[index] {
787                before_map.insert(index, input_columns);
788                input_columns += 1;
789            }
790        }
791
792        // Permute the column references in `before` expressions and predicates.
793        for expr in before_expr.iter_mut() {
794            expr.permute_map(&before_map);
795        }
796        for pred in before_pred.iter_mut() {
797            pred.permute_map(&before_map);
798        }
799
800        // Demand information determines `before`s output projection.
801        // Specifically, we produce all input columns in the output, as well as
802        // any columns that are available and demanded.
803        before_proj.extend(0..input_arity);
804        for index in self.input_arity..available_expr.len() {
805            // If an intermediate result is both available and demanded,
806            // we should produce it as output.
807            if available_expr[index] && demanded.contains(&index) {
808                // Use the new location of `index`.
809                before_proj.push(before_map[&index]);
810            }
811        }
812
813        // Map from prior output locations to location in post-`before` columns.
814        // This map is used to correct references in `after`.
815        // The presumption is that `after` will be presented with all input columns,
816        // followed by the output columns introduced by `before` in order.
817        let mut after_map = BTreeMap::new();
818        for index in 0..self.input_arity {
819            after_map.insert(index, index);
820        }
821        for index in self.input_arity..available_expr.len() {
822            // If an intermediate result is both available and demanded,
823            // it was produced as output.
824            if available_expr[index] && demanded.contains(&index) {
825                // We expect to find the output as far after `self.input_arity` as
826                // it was produced after `input_arity` in the output of `before`.
827                let location = self.input_arity
828                    + (before_proj
829                        .iter()
830                        .position(|x| x == &before_map[&index])
831                        .unwrap()
832                        - input_arity);
833                after_map.insert(index, location);
834            }
835        }
836        // We must now re-map the remaining non-demanded expressions, which are
837        // contiguous rather than potentially interspersed.
838        for index in self.input_arity..available_expr.len() {
839            if !available_expr[index] {
840                after_map.insert(index, after_map.len());
841            }
842        }
843
844        // Permute the column references in `after` expressions and predicates.
845        for expr in after_expr.iter_mut() {
846            expr.permute_map(&after_map);
847        }
848        for pred in after_pred.iter_mut() {
849            pred.permute_map(&after_map);
850        }
851        // Populate `after` projection with the new locations of `self.projection`.
852        for index in self.projection {
853            after_proj.push(after_map[&index]);
854        }
855
856        // Form and return the before and after MapFilterProject instances.
857        let before = Self::new(input_arity)
858            .map(before_expr)
859            .filter(before_pred)
860            .project(before_proj.clone());
861        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
862            .map(after_expr)
863            .filter(after_pred)
864            .project(after_proj);
865        (before, after)
866    }
867
868    /// Lists input columns whose values are used in outputs.
869    ///
870    /// You can use `BTreeSet::last()` to extract the maximum demanded column from the set.
871    ///
872    /// It is entirely appropriate to determine the demand of an instance
873    /// and then both apply a projection to the subject of the instance and
874    /// `self.permute` this instance.
875    pub fn demand(&self) -> BTreeSet<usize> {
876        let mut demanded = BTreeSet::new();
877        for (_index, pred) in self.predicates.iter() {
878            demanded.extend(pred.support());
879        }
880        demanded.extend(self.projection.iter().cloned());
881        for index in (0..self.expressions.len()).rev() {
882            if demanded.contains(&(self.input_arity + index)) {
883                demanded.extend(self.expressions[index].support());
884            }
885        }
886        demanded.retain(|col| col < &self.input_arity);
887        demanded
888    }
889
890    /// Update input column references, due to an input projection or permutation.
891    ///
892    /// The `shuffle` argument remaps expected column identifiers to new locations,
893    /// with the expectation that `shuffle` describes all input columns, and so the
894    /// intermediate results will be able to start at position `shuffle.len()`.
895    ///
896    /// The supplied `shuffle` might not list columns that are not "demanded" by the
897    /// instance, and so we should ensure that `self` is optimized to not reference
898    /// columns that are not demanded.
899    pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
900    where
901        F: Fn(usize) -> usize,
902    {
903        let (mut map, mut filter, mut project) = self.as_map_filter_project();
904        let map_len = map.len();
905        let action = |col: &mut usize| {
906            if self.input_arity <= *col && *col < self.input_arity + map_len {
907                *col = new_input_arity + (*col - self.input_arity);
908            } else {
909                *col = remap(*col);
910            }
911        };
912        for expr in map.iter_mut() {
913            expr.visit_columns(action);
914        }
915        for pred in filter.iter_mut() {
916            pred.visit_columns(action);
917        }
918        for proj in project.iter_mut() {
919            action(proj);
920            assert!(*proj < new_input_arity + map.len());
921        }
922        *self = Self::new(new_input_arity)
923            .map(map)
924            .filter(filter)
925            .project(project)
926    }
927}
928
929// Optimization routines.
930impl<E: OptimizableExpr> MapFilterProject<E> {
931    /// Optimize the internal expression evaluation order.
932    ///
933    /// This method performs several optimizations that are meant to streamline
934    /// the execution of the `MapFilterProject` instance, but not to alter its
935    /// semantics. This includes extracting expressions that are used multiple
936    /// times, inlining those that are not, and removing expressions that are
937    /// unreferenced.
938    ///
939    /// This method will inline all temporal expressions, and remove any columns
940    /// that are not demanded by the output, which should transform any temporal
941    /// filters to a state where the temporal expressions exist only in the list
942    /// of predicates.
943    ///
944    /// # Example
945    ///
946    /// This example demonstrates how the re-use of one expression, converting
947    /// column 1 from a string to an integer, can be extracted and the results
948    /// shared among the two uses. This example is used for each of the steps
949    /// along the optimization path.
950    ///
951    /// ```rust
952    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
953    /// // Demonstrate extraction of common expressions (here: parsing strings).
954    /// let mut map_filter_project = MapFilterProject::new(5)
955    ///     .map(vec![
956    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
957    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
958    ///     ])
959    ///     .project(vec![3,4,5,6]);
960    ///
961    /// let mut expected_optimized = MapFilterProject::new(5)
962    ///     .map(vec![
963    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
964    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
965    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
966    ///     ])
967    ///     .project(vec![3,4,6,7]);
968    ///
969    /// // Optimize the expression.
970    /// map_filter_project.optimize();
971    ///
972    /// assert_eq!(
973    ///     map_filter_project,
974    ///     expected_optimized,
975    /// );
976    /// ```
977    pub fn optimize(&mut self) {
978        // Track sizes and iterate as long as they decrease.
979        let mut prev_size = None;
980        let mut self_size = usize::max_value();
981        // Continue as long as strict improvements occur.
982        while prev_size.map(|p| self_size < p).unwrap_or(true) {
983            // Lock in current size.
984            prev_size = Some(self_size);
985
986            // We have an annoying pattern of mapping literals that already exist as columns (by filters).
987            // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
988            // and then replace the mapped expression by a column reference.
989            //
990            // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
991            // the first place. The tell-tale that we see when we fix is a diff that look likes
992            //
993            // - Project (#0, #2)
994            // -   Filter (#1 = 1)
995            // -     Map (1)
996            // -       Get l0
997            // + Filter (#1 = 1)
998            // +   Get l0
999            //
1000            for (index, expr) in self.expressions.iter_mut().enumerate() {
1001                // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
1002                for (_, predicate) in self.predicates.iter() {
1003                    if let Some(col) =
1004                        E::equality_column_alias(predicate, expr, index + self.input_arity)
1005                    {
1006                        *expr = col;
1007                    }
1008                }
1009            }
1010
1011            // Optimization memoizes individual `ScalarExpr` expressions that
1012            // are sure to be evaluated, canonicalizes references to the first
1013            // occurrence of each, inlines expressions that have a reference
1014            // count of one, and then removes any expressions that are not
1015            // referenced.
1016            self.memoize_expressions();
1017            self.predicates.sort();
1018            self.predicates.dedup();
1019            self.inline_expressions();
1020            self.remove_undemanded();
1021
1022            // Re-build `self` from parts to restore evaluation order invariants.
1023            let (map, filter, project) = self.as_map_filter_project();
1024            *self = Self::new(self.input_arity)
1025                .map(map)
1026                .filter(filter)
1027                .project(project);
1028
1029            self_size = self.size();
1030        }
1031    }
1032
1033    /// Total expression sizes across all expressions.
1034    pub fn size(&self) -> usize {
1035        self.expressions
1036            .iter()
1037            .map(|e| OptimizableExpr::size(e))
1038            .sum::<usize>()
1039            + self
1040                .predicates
1041                .iter()
1042                .map(|(_, e)| OptimizableExpr::size(e))
1043                .sum::<usize>()
1044    }
1045
1046    /// Place each certainly evaluated expression in its own column.
1047    ///
1048    /// This method places each non-trivial, certainly evaluated expression
1049    /// in its own column, and deduplicates them so that all references to
1050    /// the same expression reference the same column.
1051    ///
1052    /// This transformation is restricted to expressions we are certain will
1053    /// be evaluated, which does not include expressions in `if` statements.
1054    ///
1055    /// # Example
1056    ///
1057    /// This example demonstrates how memoization notices `MirScalarExpr`s
1058    /// that are used multiple times, and ensures that each are extracted
1059    /// into columns and then referenced by column. This pass does not try
1060    /// to minimize the occurrences of column references, which will happen
1061    /// in inlining.
1062    ///
1063    /// ```rust
1064    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1065    /// // Demonstrate extraction of common expressions (here: parsing strings).
1066    /// let mut map_filter_project = MapFilterProject::new(5)
1067    ///     .map(vec![
1068    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1069    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1070    ///     ])
1071    ///     .project(vec![3,4,5,6]);
1072    ///
1073    /// let mut expected_optimized = MapFilterProject::new(5)
1074    ///     .map(vec![
1075    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1076    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1077    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1078    ///         MirScalarExpr::column(7),
1079    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1080    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1081    ///         MirScalarExpr::column(10),
1082    ///     ])
1083    ///     .project(vec![3,4,8,11]);
1084    ///
1085    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1086    /// map_filter_project.memoize_expressions();
1087    ///
1088    /// assert_eq!(
1089    ///     map_filter_project,
1090    ///     expected_optimized,
1091    /// );
1092    /// ```
1093    ///
1094    /// Expressions may not be memoized if they are not certain to be evaluated,
1095    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1096    ///
1097    /// ```rust
1098    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1099    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1100    /// // the non-extraction of common expressions guarded by conditions.
1101    /// let mut map_filter_project = MapFilterProject::new(2)
1102    ///     .map(vec![
1103    ///         MirScalarExpr::If {
1104    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1105    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1106    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1107    ///         },
1108    ///         MirScalarExpr::If {
1109    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1110    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1111    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1112    ///         },
1113    ///     ]);
1114    ///
1115    /// let mut expected_optimized = MapFilterProject::new(2)
1116    ///     .map(vec![
1117    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1118    ///         MirScalarExpr::If {
1119    ///             cond: Box::new(MirScalarExpr::column(2)),
1120    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1121    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1122    ///         },
1123    ///         MirScalarExpr::column(3),
1124    ///         MirScalarExpr::If {
1125    ///             cond: Box::new(MirScalarExpr::column(2)),
1126    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1127    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1128    ///         },
1129    ///         MirScalarExpr::column(5),
1130    ///     ])
1131    ///     .project(vec![0,1,4,6]);
1132    ///
1133    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1134    /// map_filter_project.memoize_expressions();
1135    ///
1136    /// assert_eq!(
1137    ///     map_filter_project,
1138    ///     expected_optimized,
1139    /// );
1140    /// ```
1141    pub fn memoize_expressions(&mut self) {
1142        // Record the mapping from starting column references to new column
1143        // references.
1144        let mut remaps = BTreeMap::new();
1145        for index in 0..self.input_arity {
1146            remaps.insert(index, index);
1147        }
1148        let mut new_expressions = Vec::new();
1149
1150        // We follow the same order as for evaluation, to ensure that all
1151        // column references exist in time for their evaluation. We could
1152        // prioritize predicates, but we would need to be careful to chase
1153        // down column references to expressions and memoize those as well.
1154        let mut expression = 0;
1155        for (support, predicate) in self.predicates.iter_mut() {
1156            while self.input_arity + expression < *support {
1157                self.expressions[expression].permute_map(&remaps);
1158                memoize_expr(
1159                    &mut self.expressions[expression],
1160                    &mut new_expressions,
1161                    self.input_arity,
1162                );
1163                remaps.insert(
1164                    self.input_arity + expression,
1165                    self.input_arity + new_expressions.len(),
1166                );
1167                new_expressions.push(self.expressions[expression].clone());
1168                expression += 1;
1169            }
1170            predicate.permute_map(&remaps);
1171            memoize_expr(predicate, &mut new_expressions, self.input_arity);
1172        }
1173        while expression < self.expressions.len() {
1174            self.expressions[expression].permute_map(&remaps);
1175            memoize_expr(
1176                &mut self.expressions[expression],
1177                &mut new_expressions,
1178                self.input_arity,
1179            );
1180            remaps.insert(
1181                self.input_arity + expression,
1182                self.input_arity + new_expressions.len(),
1183            );
1184            new_expressions.push(self.expressions[expression].clone());
1185            expression += 1;
1186        }
1187
1188        self.expressions = new_expressions;
1189        for proj in self.projection.iter_mut() {
1190            *proj = remaps[proj];
1191        }
1192
1193        // Restore predicate order invariants.
1194        for (pos, pred) in self.predicates.iter_mut() {
1195            *pos = pred.support().last().map(|x| *x + 1).unwrap_or(0);
1196        }
1197    }
1198
1199    /// This method inlines expressions with a single use.
1200    ///
1201    /// This method only inlines expressions; it does not delete expressions
1202    /// that are no longer referenced. The `remove_undemanded()` method does
1203    /// that, and should likely be used after this method.
1204    ///
1205    /// Inlining replaces column references when the referred-to item is either
1206    /// another column reference, or the only referrer of its referent. This
1207    /// is most common after memoization has atomized all expressions to seek
1208    /// out re-use: inlining re-assembles expressions that were not helpfully
1209    /// shared with other expressions.
1210    ///
1211    /// # Example
1212    ///
1213    /// In this example, we see that with only a single reference to columns
1214    /// 0 and 2, their parsing can each be inlined. Similarly, column references
1215    /// can be cleaned up among expressions, and in the final projection.
1216    ///
1217    /// Also notice the remaining expressions, which can be cleaned up in a later
1218    /// pass (the `remove_undemanded` method).
1219    ///
1220    /// ```rust
1221    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1222    /// // Use the output from first `memoize_expression` example.
1223    /// let mut map_filter_project = MapFilterProject::new(5)
1224    ///     .map(vec![
1225    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1226    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1227    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1228    ///         MirScalarExpr::column(7),
1229    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1230    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1231    ///         MirScalarExpr::column(10),
1232    ///     ])
1233    ///     .project(vec![3,4,8,11]);
1234    ///
1235    /// let mut expected_optimized = MapFilterProject::new(5)
1236    ///     .map(vec![
1237    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1238    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1239    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1240    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1241    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1242    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1243    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1244    ///     ])
1245    ///     .project(vec![3,4,8,11]);
1246    ///
1247    /// // Inline expressions that are referenced only once.
1248    /// map_filter_project.inline_expressions();
1249    ///
1250    /// assert_eq!(
1251    ///     map_filter_project,
1252    ///     expected_optimized,
1253    /// );
1254    /// ```
1255    pub fn inline_expressions(&mut self) {
1256        // Local copy of input_arity to avoid borrowing `self` in closures.
1257        let input_arity = self.input_arity;
1258        // Reference counts track the number of places that a reference occurs.
1259        let mut reference_count = vec![0; input_arity + self.expressions.len()];
1260        // Increment reference counts for each use
1261        for expr in self.expressions.iter() {
1262            expr.visit_pre(&mut |e| {
1263                if let Some(i) = e.as_column() {
1264                    reference_count[i] += 1;
1265                }
1266            })
1267            .expect("visit_pre hit recursion limit");
1268        }
1269        for (_, pred) in self.predicates.iter() {
1270            pred.visit_pre(&mut |e| {
1271                if let Some(i) = e.as_column() {
1272                    reference_count[i] += 1;
1273                }
1274            })
1275            .expect("visit_pre hit recursion limit");
1276        }
1277        for proj in self.projection.iter() {
1278            reference_count[*proj] += 1;
1279        }
1280
1281        // Determine which expressions should be inlined because they reference temporal expressions.
1282        let mut is_temporal = vec![false; input_arity];
1283        for expr in self.expressions.iter() {
1284            // An express may contain a temporal expression, or reference a column containing such.
1285            is_temporal.push(
1286                OptimizableExpr::contains_temporal(expr)
1287                    || expr.support().into_iter().any(|col| is_temporal[col]),
1288            );
1289        }
1290
1291        // Inline only those columns that 1. are expressions not inputs, and
1292        // 2a. are column references or literals or 2b. have a refcount of 1,
1293        // or 2c. reference temporal expressions (which cannot be evaluated).
1294        let mut should_inline = vec![false; reference_count.len()];
1295        for i in (input_arity..reference_count.len()).rev() {
1296            if let Some(c) = self.expressions[i - input_arity].as_column() {
1297                should_inline[i] = true;
1298                // The reference count of the referenced column should be
1299                // incremented with the number of references
1300                // `self.expressions[i - input_arity]` has.
1301                // Subtract 1 because `self.expressions[i - input_arity]` is
1302                // itself a reference.
1303                reference_count[c] += reference_count[i] - 1;
1304            } else {
1305                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1306            }
1307        }
1308        // Inline expressions per `should_inline`.
1309        self.perform_inlining(should_inline);
1310        // We can only inline column references in `self.projection`, but we should.
1311        for proj in self.projection.iter_mut() {
1312            if *proj >= self.input_arity {
1313                if let Some(i) = self.expressions[*proj - self.input_arity].as_column() {
1314                    // TODO(mgree) !!! propagate name information to projection
1315                    *proj = i;
1316                }
1317            }
1318        }
1319    }
1320
1321    /// Inlines those expressions that are indicated by should_inline.
1322    /// See `inline_expressions` for usage.
1323    pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1324        for index in 0..self.expressions.len() {
1325            let (prior, expr) = self.expressions.split_at_mut(index);
1326            expr[0]
1327                .visit_mut_post(&mut |e| {
1328                    if let Some(i) = e.as_column() {
1329                        if should_inline[i] {
1330                            *e = prior[i - self.input_arity].clone();
1331                        }
1332                    }
1333                })
1334                .expect("inlining hit recursion limit");
1335        }
1336        for (_index, pred) in self.predicates.iter_mut() {
1337            let expressions = &self.expressions;
1338            pred.visit_mut_post(&mut |e| {
1339                if let Some(i) = e.as_column() {
1340                    if should_inline[i] {
1341                        *e = expressions[i - self.input_arity].clone();
1342                    }
1343                }
1344            })
1345            .expect("inlining hit recursion limit");
1346        }
1347    }
1348
1349    /// Removes unused expressions from `self.expressions`.
1350    ///
1351    /// Expressions are "used" if they are relied upon by any output columns
1352    /// or any predicates, even transitively. Any expressions that are not
1353    /// relied upon in this way can be discarded.
1354    ///
1355    /// # Example
1356    ///
1357    /// ```rust
1358    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1359    /// // Use the output from `inline_expression` example.
1360    /// let mut map_filter_project = MapFilterProject::new(5)
1361    ///     .map(vec![
1362    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1363    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1364    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1365    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1366    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1367    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1368    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1369    ///     ])
1370    ///     .project(vec![3,4,8,11]);
1371    ///
1372    /// let mut expected_optimized = MapFilterProject::new(5)
1373    ///     .map(vec![
1374    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1375    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1376    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1377    ///     ])
1378    ///     .project(vec![3,4,6,7]);
1379    ///
1380    /// // Remove undemanded expressions, streamlining the work done..
1381    /// map_filter_project.remove_undemanded();
1382    ///
1383    /// assert_eq!(
1384    ///     map_filter_project,
1385    ///     expected_optimized,
1386    /// );
1387    /// ```
1388    pub fn remove_undemanded(&mut self) {
1389        // Determine the demanded expressions to remove irrelevant ones.
1390        let mut demand = BTreeSet::new();
1391        for (_index, pred) in self.predicates.iter() {
1392            demand.extend(pred.support());
1393        }
1394        // Start from the output columns as presumed demanded.
1395        // If this is not the case, the caller should project some away.
1396        demand.extend(self.projection.iter().cloned());
1397        // Proceed in *reverse* order, as expressions may depend on other
1398        // expressions that precede them.
1399        for index in (0..self.expressions.len()).rev() {
1400            if demand.contains(&(self.input_arity + index)) {
1401                demand.extend(self.expressions[index].support());
1402            }
1403        }
1404
1405        // Maintain a map from initial column identifiers to locations
1406        // once we have removed undemanded expressions.
1407        let mut remap = BTreeMap::new();
1408        // This map only needs to map elements of `demand` to a new location,
1409        // but the logic is easier if we include all input columns (as the
1410        // new position is then determined by the size of the map).
1411        for index in 0..self.input_arity {
1412            remap.insert(index, index);
1413        }
1414        // Retain demanded expressions, and record their new locations.
1415        let mut new_expressions = Vec::new();
1416        for (index, expr) in self.expressions.drain(..).enumerate() {
1417            if demand.contains(&(index + self.input_arity)) {
1418                remap.insert(index + self.input_arity, remap.len());
1419                new_expressions.push(expr);
1420            }
1421        }
1422        self.expressions = new_expressions;
1423
1424        // Update column identifiers; rebuild `Self` to re-establish any invariants.
1425        // We mirror `self.permute(&remap)` but we specifically want to remap columns
1426        // that are produced by `self.expressions` after the input columns.
1427        let (expressions, predicates, projection) = self.as_map_filter_project();
1428        *self = Self::new(self.input_arity)
1429            .map(expressions.into_iter().map(|mut e| {
1430                e.permute_map(&remap);
1431                e
1432            }))
1433            .filter(predicates.into_iter().map(|mut p| {
1434                p.permute_map(&remap);
1435                p
1436            }))
1437            .project(projection.into_iter().map(|c| remap[&c]));
1438    }
1439}
1440
1441// TODO: move this elsewhere?
1442/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1443///
1444/// A part of `expr` that is memoized is replaced by a reference to column
1445/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1446/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1447/// refers to.
1448pub fn memoize_expr<E: OptimizableExpr>(
1449    expr: &mut E,
1450    memoized_parts: &mut Vec<E>,
1451    input_arity: usize,
1452) {
1453    #[allow(deprecated)]
1454    expr.visit_mut_pre_post(&mut |e| e.eager_children(), &mut |e| {
1455        if E::is_literal(e) {
1456            // Literals do not need to be memoized.
1457            return;
1458        }
1459        if let Some(col) = e.as_column_mut() {
1460            // Column references do not need to be memoized, but may need to be
1461            // updated if they reference a column reference themselves.
1462            if *col > input_arity {
1463                if let Some(col2) = memoized_parts[*col - input_arity].as_column() {
1464                    // Update the column index in place, preserving any name information.
1465                    *col = col2;
1466                }
1467            }
1468            return;
1469        }
1470        // TODO: OOO (Optimizer Optimization Opportunity):
1471        // we are quadratic in expression size because of this .iter().position
1472        if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1473            // Any complex expression that already exists as a prior column can
1474            // be replaced by a reference to that column.
1475            *e = E::column(input_arity + position);
1476        } else {
1477            // A complex expression that does not exist should be memoized, and
1478            // replaced by a reference to the column.
1479            memoized_parts.push(std::mem::replace(
1480                e,
1481                E::column(input_arity + memoized_parts.len()),
1482            ));
1483        }
1484    })
1485    .expect("memoize_expr hit recursion limit");
1486}
1487
1488pub mod util {
1489    use std::collections::BTreeMap;
1490
1491    use crate::MirScalarExpr;
1492    use crate::scalar::columns::Columns;
1493
1494    #[allow(dead_code)]
1495    /// A triple of actions that map from rows to (key, val) pairs and back again.
1496    struct KeyValRowMapping {
1497        /// Expressions to apply to a row to produce key datums.
1498        to_key: Vec<MirScalarExpr>,
1499        /// Columns to project from a row to produce residual value datums.
1500        to_val: Vec<usize>,
1501        /// Columns to project from the concatenation of key and value to reconstruct the row.
1502        to_row: Vec<usize>,
1503    }
1504
1505    /// Derive supporting logic to support transforming rows to (key, val) pairs,
1506    /// and back again.
1507    ///
1508    /// We are given as input a list of mappings from columns to key indices, a key length,
1509    /// and an input arity. (The produced key should be the application of the key expressions.)
1510    ///
1511    /// To produce the `val` output, we will identify those input columns not found in
1512    /// the key expressions, and name all other columns.
1513    /// To reconstitute the original row, we identify the sequence of columns from the
1514    /// concatenation of key and val which would reconstruct the original row.
1515    ///
1516    /// The output is a pair of column sequences, the first used to reconstruct a row
1517    /// from the concatenation of key and value, and the second to identify the columns
1518    /// of a row that should become the value associated with its key.
1519    ///
1520    /// The permutations and thinning expressions generated here will be tracked in
1521    /// `compute_types::plan::AvailableCollections`; see the
1522    /// documentation there for more details.
1523    pub fn permutation_for_arrangement(
1524        key: &[impl Columns],
1525        unthinned_arity: usize,
1526    ) -> (Vec<usize>, Vec<usize>) {
1527        let columns_in_key: BTreeMap<_, _> = key
1528            .iter()
1529            .enumerate()
1530            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1531            .collect();
1532
1533        let mut input_cursor = key.len();
1534        let permutation = (0..unthinned_arity)
1535            .map(|c| {
1536                if let Some(c) = columns_in_key.get(&c) {
1537                    // Column is in key (and thus gone from the value
1538                    // of the thinned representation)
1539                    *c
1540                } else {
1541                    // Column remains in value of the thinned representation
1542                    input_cursor += 1;
1543                    input_cursor - 1
1544                }
1545            })
1546            .collect();
1547        let thinning = (0..unthinned_arity)
1548            .filter(|c| !columns_in_key.contains_key(c))
1549            .collect();
1550        (permutation, thinning)
1551    }
1552
1553    /// Given the permutations (see [`permutation_for_arrangement`] and
1554    /// (`dataflow::plan::AvailableCollections`) corresponding to two
1555    /// collections with the same key arity,
1556    /// computes the permutation for the result of joining them.
1557    pub fn join_permutations(
1558        key_arity: usize,
1559        stream_permutation: Vec<usize>,
1560        thinned_stream_arity: usize,
1561        lookup_permutation: Vec<usize>,
1562    ) -> BTreeMap<usize, usize> {
1563        let stream_arity = stream_permutation.len();
1564        let lookup_arity = lookup_permutation.len();
1565
1566        (0..stream_arity + lookup_arity)
1567            .map(|i| {
1568                let location = if i < stream_arity {
1569                    stream_permutation[i]
1570                } else {
1571                    let location_in_lookup = lookup_permutation[i - stream_arity];
1572                    if location_in_lookup < key_arity {
1573                        location_in_lookup
1574                    } else {
1575                        location_in_lookup + thinned_stream_arity
1576                    }
1577                };
1578                (i, location)
1579            })
1580            .collect()
1581    }
1582}
1583
1584pub mod plan {
1585    use std::iter;
1586
1587    use mz_repr::{Datum, Diff, Row, RowArena};
1588    use serde::{Deserialize, Serialize};
1589
1590    use crate::Eval;
1591    use crate::scalar::optimizable::OptimizableExpr;
1592    use crate::{EvalError, MapFilterProject, MirScalarExpr};
1593
1594    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1595    #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1596    #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1597    pub struct SafeMfpPlan<E: OptimizableExpr = MirScalarExpr> {
1598        pub(crate) mfp: MapFilterProject<E>,
1599    }
1600
1601    impl<E: OptimizableExpr> SafeMfpPlan<E> {
1602        /// Wrap a `MapFilterProject` in a `SafeMfpPlan`.
1603        pub fn from_mfp(mfp: MapFilterProject<E>) -> Self {
1604            Self { mfp }
1605        }
1606
1607        /// Unwrap the inner `MapFilterProject`.
1608        pub fn into_mfp(self) -> MapFilterProject<E> {
1609            self.mfp
1610        }
1611
1612        /// Remaps references to input columns according to `remap`.
1613        ///
1614        /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1615        pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1616        where
1617            F: Fn(usize) -> usize,
1618        {
1619            self.mfp.permute_fn(remap, new_arity);
1620        }
1621
1622        /// Returns true when `Self` is the identity.
1623        pub fn is_identity(&self) -> bool {
1624            self.mfp.is_identity()
1625        }
1626    }
1627
1628    impl<E: OptimizableExpr + Eval> SafeMfpPlan<E> {
1629        /// Evaluates the linear operator on a supplied list of datums.
1630        ///
1631        /// The arguments are the initial datums associated with the row,
1632        /// and an appropriately lifetimed arena for temporary allocations
1633        /// needed by scalar evaluation.
1634        ///
1635        /// An `Ok` result will either be `None` if any predicate did not
1636        /// evaluate to `Datum::True`, or the values of the columns listed
1637        /// by `self.projection` if all predicates passed. If an error
1638        /// occurs in the evaluation it is returned as an `Err` variant.
1639        /// As the evaluation exits early with failed predicates, it may
1640        /// miss some errors that would occur later in evaluation.
1641        ///
1642        /// The `row` is not cleared first, but emptied if the function
1643        /// returns `Ok(Some(row)).
1644        #[inline(always)]
1645        pub fn evaluate_into<'a, 'row>(
1646            &'a self,
1647            datums: &mut Vec<Datum<'a>>,
1648            arena: &'a RowArena,
1649            row_buf: &'row mut Row,
1650        ) -> Result<Option<&'row Row>, EvalError> {
1651            let passed_predicates = self.evaluate_inner(datums, arena)?;
1652            if !passed_predicates {
1653                Ok(None)
1654            } else {
1655                row_buf
1656                    .packer()
1657                    .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1658                Ok(Some(row_buf))
1659            }
1660        }
1661
1662        /// A version of `evaluate` which produces an iterator over `Datum`
1663        /// as output.
1664        ///
1665        /// This version can be useful when one wants to capture the resulting
1666        /// datums without packing and then unpacking a row.
1667        #[inline(always)]
1668        pub fn evaluate_iter<'b, 'a: 'b>(
1669            &'a self,
1670            datums: &'b mut Vec<Datum<'a>>,
1671            arena: &'a RowArena,
1672        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1673            let passed_predicates = self.evaluate_inner(datums, arena)?;
1674            if !passed_predicates {
1675                Ok(None)
1676            } else {
1677                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1678            }
1679        }
1680
1681        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1682        ///
1683        /// This does not apply `self.projection`, which is up to the calling method.
1684        pub fn evaluate_inner<'b, 'a: 'b>(
1685            &'a self,
1686            datums: &'b mut Vec<Datum<'a>>,
1687            arena: &'a RowArena,
1688        ) -> Result<bool, EvalError> {
1689            let mut expression = 0;
1690            for (support, predicate) in self.mfp.predicates.iter() {
1691                while self.mfp.input_arity + expression < *support {
1692                    datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1693                    expression += 1;
1694                }
1695                if predicate.eval(&datums[..], arena)? != Datum::True {
1696                    return Ok(false);
1697                }
1698            }
1699            while expression < self.mfp.expressions.len() {
1700                datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1701                expression += 1;
1702            }
1703            Ok(true)
1704        }
1705
1706        /// Returns true if evaluation could introduce an error on non-error inputs.
1707        pub fn could_error(&self) -> bool {
1708            self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1709                || self.mfp.expressions.iter().any(|e| e.could_error())
1710        }
1711    }
1712
1713    impl<E: OptimizableExpr> std::ops::Deref for SafeMfpPlan<E> {
1714        type Target = MapFilterProject<E>;
1715        fn deref(&self) -> &Self::Target {
1716            &self.mfp
1717        }
1718    }
1719
1720    /// Predicates partitioned into temporal and non-temporal.
1721    ///
1722    /// Temporal predicates require some recognition to determine their
1723    /// structure, and it is best to do that once and re-use the results.
1724    ///
1725    /// There are restrictions on the temporal predicates we currently support.
1726    /// They must directly constrain `MzNow` from below or above,
1727    /// by expressions that do not themselves contain `MzNow`.
1728    /// Conjunctions of such constraints are also ok.
1729    #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
1730    #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1731    pub struct MfpPlan<E: OptimizableExpr = MirScalarExpr> {
1732        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1733        pub(crate) mfp: SafeMfpPlan<E>,
1734        /// Expressions that when evaluated lower-bound or equal (<=) `MzNow`.
1735        pub(crate) lower_bounds: Vec<E>,
1736        /// Expressions that when evaluated strictly upper-bound `MzNow`.
1737        pub(crate) upper_bounds: Vec<E>,
1738    }
1739
1740    impl<E: OptimizableExpr> MfpPlan<E> {
1741        /// Construct an `MfpPlan` from its components.
1742        pub fn from_parts(mfp: SafeMfpPlan<E>, lower_bounds: Vec<E>, upper_bounds: Vec<E>) -> Self {
1743            Self {
1744                mfp,
1745                lower_bounds,
1746                upper_bounds,
1747            }
1748        }
1749
1750        /// Deconstruct into components.
1751        pub fn into_parts(self) -> (SafeMfpPlan<E>, Vec<E>, Vec<E>) {
1752            (self.mfp, self.lower_bounds, self.upper_bounds)
1753        }
1754
1755        /// Access the inner `SafeMfpPlan`.
1756        pub fn safe_mfp(&self) -> &SafeMfpPlan<E> {
1757            &self.mfp
1758        }
1759
1760        /// Borrow all parts: `(safe_mfp, lower_bounds, upper_bounds)`.
1761        pub fn as_parts(&self) -> (&SafeMfpPlan<E>, &[E], &[E]) {
1762            (&self.mfp, &self.lower_bounds, &self.upper_bounds)
1763        }
1764
1765        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1766        ///
1767        /// The first returned list is of predicates that do not contain `mz_now`.
1768        /// The second and third returned lists contain expressions that, once evaluated, lower
1769        /// and upper bound the validity interval of a record, respectively. These second two
1770        /// lists are populated only by binary expressions of the form
1771        /// ```ignore
1772        /// mz_now cmp_op expr
1773        /// ```
1774        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1775        ///
1776        /// If any unsupported expression is found, for example one that uses `mz_now`
1777        /// in an unsupported position, an error is returned.
1778        pub fn create_from(mut mfp: MapFilterProject<E>) -> Result<Self, String> {
1779            mfp.optimize();
1780
1781            let mut temporal = Vec::new();
1782            mfp.predicates.retain(|(_position, predicate)| {
1783                if OptimizableExpr::contains_temporal(predicate) {
1784                    temporal.push(predicate.clone());
1785                    false
1786                } else {
1787                    true
1788                }
1789            });
1790
1791            let (lower_bounds, upper_bounds) = E::extract_temporal_bounds(temporal)?;
1792
1793            Ok(Self {
1794                mfp: SafeMfpPlan { mfp },
1795                lower_bounds,
1796                upper_bounds,
1797            })
1798        }
1799
1800        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1801        pub fn is_identity(&self) -> bool {
1802            self.mfp.mfp.is_identity()
1803                && self.lower_bounds.is_empty()
1804                && self.upper_bounds.is_empty()
1805        }
1806
1807        /// Returns `true` if the plan contains temporal bounds
1808        /// (i.e., predicates involving `mz_now()`).
1809        pub fn has_temporal_bounds(&self) -> bool {
1810            !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
1811        }
1812
1813        /// Returns `self`, and leaves behind an identity operator that acts on its output.
1814        pub fn take(&mut self) -> Self {
1815            let mut identity = Self {
1816                mfp: SafeMfpPlan {
1817                    mfp: MapFilterProject::new(self.mfp.projection.len()),
1818                },
1819                lower_bounds: Default::default(),
1820                upper_bounds: Default::default(),
1821            };
1822            std::mem::swap(self, &mut identity);
1823            identity
1824        }
1825
1826        /// Attempt to convert self into a non-temporal MapFilterProject plan.
1827        ///
1828        /// If that is not possible, the original instance is returned as an error.
1829        #[allow(clippy::result_large_err)]
1830        pub fn into_nontemporal(self) -> Result<SafeMfpPlan<E>, Self> {
1831            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1832                Ok(self.mfp)
1833            } else {
1834                Err(self)
1835            }
1836        }
1837
1838        /// Returns an iterator over mutable references to all non-temporal
1839        /// scalar expressions in the plan.
1840        ///
1841        /// The order of iteration is unspecified.
1842        pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut E> {
1843            iter::empty()
1844                .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1845                .chain(&mut self.mfp.mfp.expressions)
1846                .chain(&mut self.lower_bounds)
1847                .chain(&mut self.upper_bounds)
1848        }
1849
1850        /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1851        ///
1852        /// At the moment, this is only true if it projects away all columns and applies no filters,
1853        /// but it could be extended to plans that produce literals independent of the input.
1854        pub fn ignores_input(&self) -> bool {
1855            self.lower_bounds.is_empty()
1856                && self.upper_bounds.is_empty()
1857                && self.mfp.mfp.projection.is_empty()
1858                && self.mfp.mfp.predicates.is_empty()
1859        }
1860    }
1861
1862    impl<E: OptimizableExpr + Eval> MfpPlan<E> {
1863        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1864        ///
1865        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1866        /// or an evaluation error. If `self contains temporal predicates, the results can be times
1867        /// that are greater than the input `time`, and may contain negated `diff` values.
1868        ///
1869        /// The `row_builder` is not cleared first, but emptied if the function
1870        /// returns an iterator with any `Ok(_)` element.
1871        pub fn evaluate<'b, 'a: 'b, Err: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1872            &'a self,
1873            datums: &'b mut Vec<Datum<'a>>,
1874            arena: &'a RowArena,
1875            time: mz_repr::Timestamp,
1876            diff: Diff,
1877            valid_time: V,
1878            row_builder: &mut Row,
1879        ) -> impl Iterator<
1880            Item = Result<(Row, mz_repr::Timestamp, Diff), (Err, mz_repr::Timestamp, Diff)>,
1881        > + use<Err, V, E> {
1882            match self.mfp.evaluate_inner(datums, arena) {
1883                Err(e) => {
1884                    return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1885                }
1886                Ok(true) => {}
1887                Ok(false) => {
1888                    return None.into_iter().chain(None);
1889                }
1890            }
1891
1892            // Lower and upper bounds.
1893            let mut lower_bound = time;
1894            let mut upper_bound = None;
1895
1896            // Track whether we have seen a null in either bound, as this should
1897            // prevent the record from being produced at any time.
1898            let mut null_eval = false;
1899
1900            // Advance our lower bound to be at least the result of any lower bound
1901            // expressions.
1902            for l in self.lower_bounds.iter() {
1903                match l.eval(datums, arena) {
1904                    Err(e) => {
1905                        return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1906                    }
1907                    Ok(Datum::MzTimestamp(d)) => {
1908                        lower_bound = lower_bound.max(d);
1909                    }
1910                    Ok(Datum::Null) => {
1911                        null_eval = true;
1912                    }
1913                    x => {
1914                        panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1915                    }
1916                }
1917            }
1918
1919            // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1920            if !valid_time(&lower_bound) {
1921                return None.into_iter().chain(None);
1922            }
1923
1924            // If there are any upper bounds, determine the minimum upper bound.
1925            for u in self.upper_bounds.iter() {
1926                // We can cease as soon as the lower and upper bounds match,
1927                // as the update will certainly not be produced in that case.
1928                if upper_bound != Some(lower_bound) {
1929                    match u.eval(datums, arena) {
1930                        Err(e) => {
1931                            return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1932                        }
1933                        Ok(Datum::MzTimestamp(d)) => {
1934                            if let Some(upper) = upper_bound {
1935                                upper_bound = Some(upper.min(d));
1936                            } else {
1937                                upper_bound = Some(d);
1938                            };
1939                            // Force the upper bound to be at least the lower
1940                            // bound. The `is_some()` test should always be true
1941                            // due to the above block, but maintain it here in
1942                            // case that changes. It's hopefully optimized away.
1943                            if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1944                                upper_bound = Some(lower_bound);
1945                            }
1946                        }
1947                        Ok(Datum::Null) => {
1948                            null_eval = true;
1949                        }
1950                        x => {
1951                            panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1952                        }
1953                    }
1954                }
1955            }
1956
1957            // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1958            if let Some(upper) = &mut upper_bound {
1959                if !valid_time(upper) {
1960                    upper_bound = None;
1961                }
1962            }
1963
1964            // Produce an output only if the upper bound exceeds the lower bound,
1965            // and if we did not encounter a `null` in our evaluation.
1966            if Some(lower_bound) != upper_bound && !null_eval {
1967                row_builder
1968                    .packer()
1969                    .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1970                let upper_opt =
1971                    upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1972                let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1973                lower.into_iter().chain(upper_opt)
1974            } else {
1975                None.into_iter().chain(None)
1976            }
1977        }
1978
1979        /// Returns true if evaluation could introduce an error on non-error inputs.
1980        pub fn could_error(&self) -> bool {
1981            self.mfp.could_error()
1982                || self.lower_bounds.iter().any(|e| e.could_error())
1983                || self.upper_bounds.iter().any(|e| e.could_error())
1984        }
1985    }
1986}