mz_expr/
linear.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
13use mz_repr::{Datum, Row};
14use proptest::prelude::*;
15use serde::{Deserialize, Serialize};
16
17use crate::linear::proto_map_filter_project::ProtoPredicate;
18use crate::visit::Visit;
19use crate::{MirRelationExpr, MirScalarExpr};
20
21include!(concat!(env!("OUT_DIR"), "/mz_expr.linear.rs"));
22
23/// A compound operator that can be applied row-by-row.
24///
25/// This operator integrates the map, filter, and project operators.
26/// It applies a sequences of map expressions, which are allowed to
27/// refer to previous expressions, interleaved with predicates which
28/// must be satisfied for an output to be produced. If all predicates
29/// evaluate to `Datum::True` the data at the identified columns are
30/// collected and produced as output in a packed `Row`.
31///
32/// This operator is a "builder" and its contents may contain expressions
33/// that are not yet executable. For example, it may contain temporal
34/// expressions in `self.expressions`, even though this is not something
35/// we can directly evaluate. The plan creation methods will defensively
36/// ensure that the right thing happens.
37#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd)]
38pub struct MapFilterProject {
39    /// A sequence of expressions that should be appended to the row.
40    ///
41    /// Many of these expressions may not be produced in the output,
42    /// and may only be present as common subexpressions.
43    pub expressions: Vec<MirScalarExpr>,
44    /// Expressions that must evaluate to `Datum::True` for the output
45    /// row to be produced.
46    ///
47    /// Each entry is prepended with a column identifier indicating
48    /// the column *before* which the predicate should first be applied.
49    /// Most commonly this would be one plus the largest column identifier
50    /// in the predicate's support, but it could be larger to implement
51    /// guarded evaluation of predicates.
52    ///
53    /// This list should be sorted by the first field.
54    pub predicates: Vec<(usize, MirScalarExpr)>,
55    /// A sequence of column identifiers whose data form the output row.
56    pub projection: Vec<usize>,
57    /// The expected number of input columns.
58    ///
59    /// This is needed to ensure correct identification of newly formed
60    /// columns in the output.
61    pub input_arity: usize,
62}
63
64/// Use a custom [`Arbitrary`] implementation to cap the number of internal
65/// [`MirScalarExpr`] referenced by the generated [`MapFilterProject`]  instances.
66impl Arbitrary for MapFilterProject {
67    type Parameters = ();
68    type Strategy = BoxedStrategy<Self>;
69
70    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
71        (
72            prop::collection::vec(any::<MirScalarExpr>(), 0..10),
73            prop::collection::vec((any::<usize>(), any::<MirScalarExpr>()), 0..10),
74            prop::collection::vec(any::<usize>(), 0..10),
75            usize::arbitrary(),
76        )
77            .prop_map(
78                |(expressions, predicates, projection, input_arity)| MapFilterProject {
79                    expressions,
80                    predicates,
81                    projection,
82                    input_arity,
83                },
84            )
85            .boxed()
86    }
87}
88
89impl RustType<ProtoMapFilterProject> for MapFilterProject {
90    fn into_proto(&self) -> ProtoMapFilterProject {
91        ProtoMapFilterProject {
92            expressions: self.expressions.into_proto(),
93            predicates: self.predicates.into_proto(),
94            projection: self.projection.into_proto(),
95            input_arity: self.input_arity.into_proto(),
96        }
97    }
98
99    fn from_proto(proto: ProtoMapFilterProject) -> Result<Self, TryFromProtoError> {
100        Ok(MapFilterProject {
101            expressions: proto.expressions.into_rust()?,
102            predicates: proto.predicates.into_rust()?,
103            projection: proto.projection.into_rust()?,
104            input_arity: proto.input_arity.into_rust()?,
105        })
106    }
107}
108
109impl RustType<ProtoPredicate> for (usize, MirScalarExpr) {
110    fn into_proto(&self) -> ProtoPredicate {
111        ProtoPredicate {
112            column_to_apply: self.0.into_proto(),
113            predicate: Some(self.1.into_proto()),
114        }
115    }
116
117    fn from_proto(proto: ProtoPredicate) -> Result<Self, TryFromProtoError> {
118        Ok((
119            proto.column_to_apply.into_rust()?,
120            proto
121                .predicate
122                .into_rust_if_some("ProtoPredicate::predicate")?,
123        ))
124    }
125}
126
127impl Display for MapFilterProject {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        writeln!(f, "MapFilterProject(")?;
130        writeln!(f, "  expressions:")?;
131        self.expressions
132            .iter()
133            .enumerate()
134            .try_for_each(|(i, e)| writeln!(f, "    #{} <- {},", i + self.input_arity, e))?;
135        writeln!(f, "  predicates:")?;
136        self.predicates
137            .iter()
138            .try_for_each(|(before, p)| writeln!(f, "    <before: {}> {},", before, p))?;
139        writeln!(f, "  projection: {:?}", self.projection)?;
140        writeln!(f, "  input_arity: {}", self.input_arity)?;
141        writeln!(f, ")")
142    }
143}
144
145impl MapFilterProject {
146    /// Create a no-op operator for an input of a supplied arity.
147    pub fn new(input_arity: usize) -> Self {
148        Self {
149            expressions: Vec::new(),
150            predicates: Vec::new(),
151            projection: (0..input_arity).collect(),
152            input_arity,
153        }
154    }
155
156    /// Given two mfps, return an mfp that applies one
157    /// followed by the other.
158    /// Note that the arguments are in the opposite order
159    /// from how function composition is usually written in mathematics.
160    pub fn compose(before: Self, after: Self) -> Self {
161        let (m, f, p) = after.into_map_filter_project();
162        before.map(m).filter(f).project(p)
163    }
164
165    /// True if the operator describes the identity transformation.
166    pub fn is_identity(&self) -> bool {
167        self.expressions.is_empty()
168            && self.predicates.is_empty()
169            && self.projection.len() == self.input_arity
170            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
171    }
172
173    /// Retain only the indicated columns in the presented order.
174    pub fn project<I>(mut self, columns: I) -> Self
175    where
176        I: IntoIterator<Item = usize> + std::fmt::Debug,
177    {
178        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
179        self
180    }
181
182    /// Retain only rows satisfying these predicates.
183    ///
184    /// This method introduces predicates as eagerly as they can be evaluated,
185    /// which may not be desired for predicates that may cause exceptions.
186    /// If fine manipulation is required, the predicates can be added manually.
187    pub fn filter<I>(mut self, predicates: I) -> Self
188    where
189        I: IntoIterator<Item = MirScalarExpr>,
190    {
191        for mut predicate in predicates {
192            // Correct column references.
193            predicate.permute(&self.projection[..]);
194
195            // Validate column references.
196            assert!(
197                predicate
198                    .support()
199                    .into_iter()
200                    .all(|c| c < self.input_arity + self.expressions.len())
201            );
202
203            // Insert predicate as eagerly as it can be evaluated:
204            // just after the largest column in its support is formed.
205            let max_support = predicate
206                .support()
207                .into_iter()
208                .max()
209                .map(|c| c + 1)
210                .unwrap_or(0);
211            self.predicates.push((max_support, predicate))
212        }
213        // Stable sort predicates by position at which they take effect.
214        // We put literal errors at the end as a stop-gap to avoid erroring
215        // before we are able to evaluate any predicates that might prevent it.
216        self.predicates
217            .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
218        self
219    }
220
221    /// Append the result of evaluating expressions to each row.
222    pub fn map<I>(mut self, expressions: I) -> Self
223    where
224        I: IntoIterator<Item = MirScalarExpr>,
225    {
226        for mut expression in expressions {
227            // Correct column references.
228            expression.permute(&self.projection[..]);
229
230            // Validate column references.
231            assert!(
232                expression
233                    .support()
234                    .into_iter()
235                    .all(|c| c < self.input_arity + self.expressions.len())
236            );
237
238            // Introduce expression and produce as output.
239            self.expressions.push(expression);
240            self.projection
241                .push(self.input_arity + self.expressions.len() - 1);
242        }
243
244        self
245    }
246
247    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
248    pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
249        let predicates = self
250            .predicates
251            .into_iter()
252            .map(|(_pos, predicate)| predicate)
253            .collect();
254        (self.expressions, predicates, self.projection)
255    }
256
257    /// As the arguments to `Map`, `Filter`, and `Project` operators.
258    ///
259    /// In principle, this operator can be implemented as a sequence of
260    /// more elemental operators, likely less efficiently.
261    pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
262        self.clone().into_map_filter_project()
263    }
264
265    /// Determines if a scalar expression must be equal to a literal datum.
266    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum> {
267        for (_pos, predicate) in self.predicates.iter() {
268            if let MirScalarExpr::CallBinary {
269                func: crate::BinaryFunc::Eq,
270                expr1,
271                expr2,
272            } = predicate
273            {
274                if let Some(Ok(datum1)) = expr1.as_literal() {
275                    if &**expr2 == expr {
276                        return Some(datum1);
277                    }
278                }
279                if let Some(Ok(datum2)) = expr2.as_literal() {
280                    if &**expr1 == expr {
281                        return Some(datum2);
282                    }
283                }
284            }
285        }
286        None
287    }
288
289    /// Determines if a sequence of scalar expressions must be equal to a literal row.
290    ///
291    /// This method returns `None` on an empty `exprs`, which might be surprising, but
292    /// seems to line up with its callers' expectations of that being a non-constraint.
293    /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
294    /// if they would rather have a literal empty row.
295    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
296        if exprs.is_empty() {
297            return None;
298        }
299        let mut row = Row::default();
300        let mut packer = row.packer();
301        for expr in exprs {
302            if let Some(literal) = self.literal_constraint(expr) {
303                packer.push(literal);
304            } else {
305                return None;
306            }
307        }
308        Some(row)
309    }
310
311    /// Extracts any MapFilterProject at the root of the expression.
312    ///
313    /// The expression will be modified to extract any maps, filters, and
314    /// projections, which will be returned as `Self`. If there are no maps,
315    /// filters, or projections the method will return an identity operator.
316    ///
317    /// The extracted expressions may contain temporal predicates, and one
318    /// should be careful to apply them blindly.
319    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
320        // TODO: This could become iterative rather than recursive if
321        // we were able to fuse MFP operators from below, rather than
322        // from above.
323        match expr {
324            MirRelationExpr::Map { input, scalars } => {
325                let (mfp, expr) = Self::extract_from_expression(input);
326                (mfp.map(scalars.iter().cloned()), expr)
327            }
328            MirRelationExpr::Filter { input, predicates } => {
329                let (mfp, expr) = Self::extract_from_expression(input);
330                (mfp.filter(predicates.iter().cloned()), expr)
331            }
332            MirRelationExpr::Project { input, outputs } => {
333                let (mfp, expr) = Self::extract_from_expression(input);
334                (mfp.project(outputs.iter().cloned()), expr)
335            }
336            // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
337            // this call to `arity()`.
338            x => (Self::new(x.arity()), x),
339        }
340    }
341
342    /// Extracts an error-free MapFilterProject at the root of the expression.
343    ///
344    /// The expression will be modified to extract maps, filters, and projects
345    /// from the root of the expression, which will be returned as `Self`. The
346    /// extraction will halt if a Map or Filter containing a literal error is
347    /// reached. Otherwise, the method will return an identity operator.
348    ///
349    /// This method is meant to be used during optimization, where it is
350    /// necessary to avoid moving around maps and filters with errors.
351    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
352        match expr {
353            MirRelationExpr::Map { input, scalars }
354                if scalars.iter().all(|s| !s.is_literal_err()) =>
355            {
356                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
357                (mfp.map(scalars.iter().cloned()), expr)
358            }
359            MirRelationExpr::Filter { input, predicates }
360                if predicates.iter().all(|p| !p.is_literal_err()) =>
361            {
362                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
363                (mfp.filter(predicates.iter().cloned()), expr)
364            }
365            MirRelationExpr::Project { input, outputs } => {
366                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
367                (mfp.project(outputs.iter().cloned()), expr)
368            }
369            x => (Self::new(x.arity()), x),
370        }
371    }
372
373    /// Extracts an error-free MapFilterProject at the root of the expression.
374    ///
375    /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
376    /// mutable reference.
377    pub fn extract_non_errors_from_expr_ref_mut(
378        expr: &mut MirRelationExpr,
379    ) -> (Self, &mut MirRelationExpr) {
380        // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
381        // superfluous outer if, which works around a borrow-checker issue:
382        // https://github.com/rust-lang/rust/issues/54663
383        if matches!(expr, MirRelationExpr::Map { input: _, scalars } if scalars.iter().all(|s| !s.is_literal_err()))
384            || matches!(expr, MirRelationExpr::Filter { input: _, predicates } if predicates.iter().all(|p| !p.is_literal_err()))
385            || matches!(expr, MirRelationExpr::Project { .. })
386        {
387            match expr {
388                MirRelationExpr::Map { input, scalars }
389                    if scalars.iter().all(|s| !s.is_literal_err()) =>
390                {
391                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
392                    (mfp.map(scalars.iter().cloned()), expr)
393                }
394                MirRelationExpr::Filter { input, predicates }
395                    if predicates.iter().all(|p| !p.is_literal_err()) =>
396                {
397                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
398                    (mfp.filter(predicates.iter().cloned()), expr)
399                }
400                MirRelationExpr::Project { input, outputs } => {
401                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
402                    (mfp.project(outputs.iter().cloned()), expr)
403                }
404                _ => unreachable!(),
405            }
406        } else {
407            (Self::new(expr.arity()), expr)
408        }
409    }
410
411    /// Removes an error-free MapFilterProject from the root of the expression.
412    ///
413    /// The expression will be modified to extract maps, filters, and projects
414    /// from the root of the expression, which will be returned as `Self`. The
415    /// extraction will halt if a Map or Filter containing a literal error is
416    /// reached. Otherwise, the method will return an
417    /// identity operator, and the expression will remain unchanged.
418    ///
419    /// This method is meant to be used during optimization, where it is
420    /// necessary to avoid moving around maps and filters with errors.
421    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
422        match expr {
423            MirRelationExpr::Map { input, scalars }
424                if scalars.iter().all(|s| !s.is_literal_err()) =>
425            {
426                let mfp =
427                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
428                *expr = input.take_dangerous();
429                mfp
430            }
431            MirRelationExpr::Filter { input, predicates }
432                if predicates.iter().all(|p| !p.is_literal_err()) =>
433            {
434                let mfp = Self::extract_non_errors_from_expr_mut(input)
435                    .filter(predicates.iter().cloned());
436                *expr = input.take_dangerous();
437                mfp
438            }
439            MirRelationExpr::Project { input, outputs } => {
440                let mfp =
441                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
442                *expr = input.take_dangerous();
443                mfp
444            }
445            x => Self::new(x.arity()),
446        }
447    }
448
449    /// Extracts temporal predicates into their own `Self`.
450    ///
451    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
452    /// though there could be justification for extracting them as well if they are otherwise
453    /// unused.
454    ///
455    /// This separation is valuable when the execution cannot be fused into one operator.
456    pub fn extract_temporal(&mut self) -> Self {
457        // Optimize the expression, as it is only post-optimization that we can be certain
458        // that temporal expressions are restricted to filters. We could relax this in the
459        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
460        // seems to be the best fit at the moment.
461        self.optimize();
462
463        // Assert that we no longer have temporal expressions to evaluate. This should only
464        // occur if the optimization above results with temporal expressions yielded in the
465        // output, which is out of spec for how the type is meant to be used.
466        assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
467
468        // Extract temporal predicates from `self.predicates`.
469        let mut temporal_predicates = Vec::new();
470        self.predicates.retain(|(_position, predicate)| {
471            if predicate.contains_temporal() {
472                temporal_predicates.push(predicate.clone());
473                false
474            } else {
475                true
476            }
477        });
478
479        // Determine extended input columns used by temporal filters.
480        let mut support = BTreeSet::new();
481        for predicate in temporal_predicates.iter() {
482            support.extend(predicate.support());
483        }
484
485        // Discover the locations of these columns after `self.projection`.
486        let old_projection_len = self.projection.len();
487        let mut new_location = BTreeMap::new();
488        for original in support.iter() {
489            if let Some(position) = self.projection.iter().position(|x| x == original) {
490                new_location.insert(*original, position);
491            } else {
492                new_location.insert(*original, self.projection.len());
493                self.projection.push(*original);
494            }
495        }
496        // Permute references in extracted predicates to their new locations.
497        for predicate in temporal_predicates.iter_mut() {
498            predicate.permute_map(&new_location);
499        }
500
501        // Form a new `Self` containing the temporal predicates to return.
502        Self::new(self.projection.len())
503            .filter(temporal_predicates)
504            .project(0..old_projection_len)
505    }
506
507    /// Extracts common expressions from multiple `Self` into a result `Self`.
508    ///
509    /// The argument `mfps` are mutated so that each are functionaly equivalent to their
510    /// corresponding input, when composed atop the resulting `Self`.
511    ///
512    /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
513    pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
514        match mfps.len() {
515            0 => {
516                panic!("Cannot call method on empty arguments");
517            }
518            1 => {
519                let output_arity = mfps[0].projection.len();
520                std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
521            }
522            _ => {
523                // More generally, we convert each mfp to ANF, at which point we can
524                // repeatedly extract atomic expressions that depend only on input
525                // columns, migrate them to an input mfp, and repeat until no such
526                // expressions exist. At this point, we can also migrate predicates
527                // and then determine and push down projections.
528
529                // Prepare a return `Self`.
530                let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
531
532                // We convert each mfp to ANF, using `memoize_expressions`.
533                for mfp in mfps.iter_mut() {
534                    mfp.memoize_expressions();
535                }
536
537                // We repeatedly extract common expressions, until none remain.
538                let mut done = false;
539                while !done {
540                    // We use references to determine common expressions, and must
541                    // introduce a scope here to drop the borrows before mutation.
542                    let common = {
543                        // The input arity may increase as we iterate, so recapture.
544                        let input_arity = result_mfp.projection.len();
545                        let mut prev: BTreeSet<_> = mfps[0]
546                            .expressions
547                            .iter()
548                            .filter(|e| e.support().iter().max() < Some(&input_arity))
549                            .collect();
550                        let mut next = BTreeSet::default();
551                        for mfp in mfps[1..].iter() {
552                            for expr in mfp.expressions.iter() {
553                                if prev.contains(expr) {
554                                    next.insert(expr);
555                                }
556                            }
557                            std::mem::swap(&mut prev, &mut next);
558                            next.clear();
559                        }
560                        prev.into_iter().cloned().collect::<Vec<_>>()
561                    };
562                    // Without new common expressions, we should terminate the loop.
563                    done = common.is_empty();
564
565                    // Migrate each expression in `common` to `result_mfp`.
566                    for expr in common.into_iter() {
567                        // Update each mfp by removing expr and updating column references.
568                        for mfp in mfps.iter_mut() {
569                            // With `expr` next in `result_mfp`, it is as if we are rotating it to
570                            // be the first expression in `mfp`, and then removing it from `mfp` and
571                            // increasing the input arity of `mfp`.
572                            let arity = result_mfp.projection.len();
573                            let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
574                            let index = arity + found;
575                            // Column references change due to the rotation from `index` to `arity`.
576                            let action = |c: &mut usize| {
577                                if arity <= *c && *c < index {
578                                    *c += 1;
579                                } else if *c == index {
580                                    *c = arity;
581                                }
582                            };
583                            // Rotate `expr` from `found` to first, and then snip.
584                            // Short circuit by simply removing and incrementing the input arity.
585                            mfp.input_arity += 1;
586                            mfp.expressions.remove(found);
587                            // Update column references in expressions, predicates, and projections.
588                            for e in mfp.expressions.iter_mut() {
589                                e.visit_columns(action);
590                            }
591                            for (o, e) in mfp.predicates.iter_mut() {
592                                e.visit_columns(action);
593                                // Max out the offset for the predicate; optimization will correct.
594                                *o = mfp.input_arity + mfp.expressions.len();
595                            }
596                            for c in mfp.projection.iter_mut() {
597                                action(c);
598                            }
599                        }
600                        // Install the expression and update
601                        result_mfp.expressions.push(expr);
602                        result_mfp.projection.push(result_mfp.projection.len());
603                    }
604                }
605                // As before, but easier: predicates in common to all mfps.
606                let common_preds: Vec<MirScalarExpr> = {
607                    let input_arity = result_mfp.projection.len();
608                    let mut prev: BTreeSet<_> = mfps[0]
609                        .predicates
610                        .iter()
611                        .map(|(_, e)| e)
612                        .filter(|e| e.support().iter().max() < Some(&input_arity))
613                        .collect();
614                    let mut next = BTreeSet::default();
615                    for mfp in mfps[1..].iter() {
616                        for (_, expr) in mfp.predicates.iter() {
617                            if prev.contains(expr) {
618                                next.insert(expr);
619                            }
620                        }
621                        std::mem::swap(&mut prev, &mut next);
622                        next.clear();
623                    }
624                    // Expressions in common, that we will append to `result_mfp.expressions`.
625                    prev.into_iter().cloned().collect::<Vec<_>>()
626                };
627                for mfp in mfps.iter_mut() {
628                    mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
629                    mfp.optimize();
630                }
631                result_mfp.predicates.extend(
632                    common_preds
633                        .into_iter()
634                        .map(|e| (result_mfp.projection.len(), e)),
635                );
636
637                // Then, look for unused columns and project them away.
638                let mut common_demand = BTreeSet::new();
639                for mfp in mfps.iter() {
640                    common_demand.extend(mfp.demand());
641                }
642                // columns in `common_demand` must be retained, but others
643                // may be discarded.
644                let common_demand = (0..result_mfp.projection.len())
645                    .filter(|x| common_demand.contains(x))
646                    .collect::<Vec<_>>();
647                let remap = common_demand
648                    .iter()
649                    .cloned()
650                    .enumerate()
651                    .map(|(new, old)| (old, new))
652                    .collect::<BTreeMap<_, _>>();
653                for mfp in mfps.iter_mut() {
654                    mfp.permute_fn(|c| remap[&c], common_demand.len());
655                }
656                result_mfp = result_mfp.project(common_demand);
657
658                // Return the resulting MFP.
659                result_mfp.optimize();
660                result_mfp
661            }
662        }
663    }
664
665    /// Returns `self`, and leaves behind an identity operator that acts on its output.
666    pub fn take(&mut self) -> Self {
667        let mut identity = Self::new(self.projection.len());
668        std::mem::swap(self, &mut identity);
669        identity
670    }
671
672    /// Convert the `MapFilterProject` into a staged evaluation plan.
673    ///
674    /// The main behavior is extract temporal predicates, which cannot be evaluated
675    /// using the standard machinery.
676    pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
677        plan::MfpPlan::create_from(self)
678    }
679}
680
681impl MapFilterProject {
682    /// Partitions `self` into two instances, one of which can be eagerly applied.
683    ///
684    /// The `available` argument indicates which input columns are available (keys)
685    /// and in which positions (values). This information may allow some maps and
686    /// filters to execute. The `input_arity` argument reports the total number of
687    /// input columns (which may include some not present in `available`)
688    ///
689    /// This method partitions `self` in two parts, `(before, after)`, where `before`
690    /// can be applied on columns present as keys in `available`, and `after` must
691    /// await the introduction of the other input columns.
692    ///
693    /// The `before` instance will *append* any columns that can be determined from
694    /// `available` but will project away any of these columns that are not needed by
695    /// `after`. Importantly, this means that `before` will leave intact *all* input
696    /// columns including those not referenced in `available`.
697    ///
698    /// The `after` instance will presume all input columns are available, followed
699    /// by the appended columns of the `before` instance. It may be that some input
700    /// columns can be projected away in `before` if `after` does not need them, but
701    /// we leave that as something the caller can apply if needed (it is otherwise
702    /// complicated to negotiate which input columns `before` should retain).
703    ///
704    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
705    /// additional input columns, permute all input columns to their locations as
706    /// expected by `self`, follow this by new columns appended by `before`, and
707    /// remove all other columns that may be present.
708    ///
709    /// # Example
710    ///
711    /// ```rust
712    /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr};
713    ///
714    /// // imagine an action on columns (a, b, c, d).
715    /// let original = MapFilterProject::new(4).map(vec![
716    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::AddInt64),
717    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64),
718    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
719    /// ]).project(vec![6]);
720    ///
721    /// // Imagine we start with columns (b, x, a, y, c).
722    /// //
723    /// // The `partition` method requires a map from *expected* input columns to *actual*
724    /// // input columns. In the example above, the columns a, b, and c exist, and are at
725    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
726    /// let mut available_columns = std::collections::BTreeMap::new();
727    /// available_columns.insert(0, 2);
728    /// available_columns.insert(1, 0);
729    /// available_columns.insert(2, 4);
730    /// // Partition `original` using the available columns and current input arity.
731    /// // This informs `partition` which columns are available, where they can be found,
732    /// // and how many columns are not relevant but should be preserved.
733    /// let (before, after) = original.partition(available_columns, 5);
734    ///
735    /// // `before` sees all five input columns, and should append `a + b + c`.
736    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
737    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), BinaryFunc::AddInt64),
738    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
739    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
740    ///
741    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
742    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
743    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64)
744    /// ]).project(vec![5]));
745    ///
746    /// // To reconstruct `self`, we must introduce the columns that are not present,
747    /// // and present them in the order intended by `self`. In this example, we must
748    /// // introduce column d and permute the columns so that they begin (a, b, c, d).
749    /// // The columns x and y must be projected away, and any columns introduced by
750    /// // `begin` must be retained in their current order.
751    ///
752    /// // The `after` instance expects to be provided with all inputs, but it
753    /// // may not need all inputs. The `demand()` and `permute()` methods can
754    /// // optimize the representation.
755    /// ```
756    pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
757        // Map expressions, filter predicates, and projections for `before` and `after`.
758        let mut before_expr = Vec::new();
759        let mut before_pred = Vec::new();
760        let mut before_proj = Vec::new();
761        let mut after_expr = Vec::new();
762        let mut after_pred = Vec::new();
763        let mut after_proj = Vec::new();
764
765        // Track which output columns must be preserved in the output of `before`.
766        let mut demanded = BTreeSet::new();
767        demanded.extend(0..self.input_arity);
768        demanded.extend(self.projection.iter());
769
770        // Determine which map expressions can be computed from the available subset.
771        // Some expressions may depend on other expressions, but by evaluating them
772        // in forward order we should accurately determine the available expressions.
773        let mut available_expr = vec![false; self.input_arity];
774        // Initialize available columns from `available`, which is then not used again.
775        for index in available.keys() {
776            available_expr[*index] = true;
777        }
778        for expr in self.expressions.into_iter() {
779            // We treat an expression as available if its supporting columns are available,
780            // and if it is not a literal (we want to avoid pushing down literals). This
781            // choice is ad-hoc, but the intent is that we partition the operators so
782            // that we can reduce the row representation size and total computation.
783            // Pushing down literals harms the former and does nothing for the latter.
784            // In the future, we'll want to have a harder think about this trade-off, as
785            // we are certainly making sub-optimal decisions by pushing down all available
786            // work.
787            // TODO(mcsherry): establish better principles about what work to push down.
788            let is_available =
789                expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
790            if is_available {
791                before_expr.push(expr);
792            } else {
793                demanded.extend(expr.support());
794                after_expr.push(expr);
795            }
796            available_expr.push(is_available);
797        }
798
799        // Determine which predicates can be computed from the available subset.
800        for (_when, pred) in self.predicates.into_iter() {
801            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
802            if is_available {
803                before_pred.push(pred);
804            } else {
805                demanded.extend(pred.support());
806                after_pred.push(pred);
807            }
808        }
809
810        // Map from prior output location to location in un-projected `before`.
811        // This map is used to correct references in `before` but it should be
812        // adjusted to reflect `before`s projection prior to use in `after`.
813        let mut before_map = available;
814        // Input columns include any additional undescribed columns that may
815        // not be captured by the `available` argument, so we must independently
816        // track the current number of columns (vs relying on `before_map.len()`).
817        let mut input_columns = input_arity;
818        for index in self.input_arity..available_expr.len() {
819            if available_expr[index] {
820                before_map.insert(index, input_columns);
821                input_columns += 1;
822            }
823        }
824
825        // Permute the column references in `before` expressions and predicates.
826        for expr in before_expr.iter_mut() {
827            expr.permute_map(&before_map);
828        }
829        for pred in before_pred.iter_mut() {
830            pred.permute_map(&before_map);
831        }
832
833        // Demand information determines `before`s output projection.
834        // Specifically, we produce all input columns in the output, as well as
835        // any columns that are available and demanded.
836        before_proj.extend(0..input_arity);
837        for index in self.input_arity..available_expr.len() {
838            // If an intermediate result is both available and demanded,
839            // we should produce it as output.
840            if available_expr[index] && demanded.contains(&index) {
841                // Use the new location of `index`.
842                before_proj.push(before_map[&index]);
843            }
844        }
845
846        // Map from prior output locations to location in post-`before` columns.
847        // This map is used to correct references in `after`.
848        // The presumption is that `after` will be presented with all input columns,
849        // followed by the output columns introduced by `before` in order.
850        let mut after_map = BTreeMap::new();
851        for index in 0..self.input_arity {
852            after_map.insert(index, index);
853        }
854        for index in self.input_arity..available_expr.len() {
855            // If an intermediate result is both available and demanded,
856            // it was produced as output.
857            if available_expr[index] && demanded.contains(&index) {
858                // We expect to find the output as far after `self.input_arity` as
859                // it was produced after `input_arity` in the output of `before`.
860                let location = self.input_arity
861                    + (before_proj
862                        .iter()
863                        .position(|x| x == &before_map[&index])
864                        .unwrap()
865                        - input_arity);
866                after_map.insert(index, location);
867            }
868        }
869        // We must now re-map the remaining non-demanded expressions, which are
870        // contiguous rather than potentially interspersed.
871        for index in self.input_arity..available_expr.len() {
872            if !available_expr[index] {
873                after_map.insert(index, after_map.len());
874            }
875        }
876
877        // Permute the column references in `after` expressions and predicates.
878        for expr in after_expr.iter_mut() {
879            expr.permute_map(&after_map);
880        }
881        for pred in after_pred.iter_mut() {
882            pred.permute_map(&after_map);
883        }
884        // Populate `after` projection with the new locations of `self.projection`.
885        for index in self.projection {
886            after_proj.push(after_map[&index]);
887        }
888
889        // Form and return the before and after MapFilterProject instances.
890        let before = Self::new(input_arity)
891            .map(before_expr)
892            .filter(before_pred)
893            .project(before_proj.clone());
894        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
895            .map(after_expr)
896            .filter(after_pred)
897            .project(after_proj);
898        (before, after)
899    }
900
901    /// Lists input columns whose values are used in outputs.
902    ///
903    /// It is entirely appropriate to determine the demand of an instance
904    /// and then both apply a projection to the subject of the instance and
905    /// `self.permute` this instance.
906    pub fn demand(&self) -> BTreeSet<usize> {
907        let mut demanded = BTreeSet::new();
908        for (_index, pred) in self.predicates.iter() {
909            demanded.extend(pred.support());
910        }
911        demanded.extend(self.projection.iter().cloned());
912        for index in (0..self.expressions.len()).rev() {
913            if demanded.contains(&(self.input_arity + index)) {
914                demanded.extend(self.expressions[index].support());
915            }
916        }
917        demanded.retain(|col| col < &self.input_arity);
918        demanded
919    }
920
921    /// Update input column references, due to an input projection or permutation.
922    ///
923    /// The `shuffle` argument remaps expected column identifiers to new locations,
924    /// with the expectation that `shuffle` describes all input columns, and so the
925    /// intermediate results will be able to start at position `shuffle.len()`.
926    ///
927    /// The supplied `shuffle` might not list columns that are not "demanded" by the
928    /// instance, and so we should ensure that `self` is optimized to not reference
929    /// columns that are not demanded.
930    pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
931    where
932        F: Fn(usize) -> usize,
933    {
934        let (mut map, mut filter, mut project) = self.as_map_filter_project();
935        let map_len = map.len();
936        let action = |col: &mut usize| {
937            if self.input_arity <= *col && *col < self.input_arity + map_len {
938                *col = new_input_arity + (*col - self.input_arity);
939            } else {
940                *col = remap(*col);
941            }
942        };
943        for expr in map.iter_mut() {
944            expr.visit_columns(action);
945        }
946        for pred in filter.iter_mut() {
947            pred.visit_columns(action);
948        }
949        for proj in project.iter_mut() {
950            action(proj);
951            assert!(*proj < new_input_arity + map.len());
952        }
953        *self = Self::new(new_input_arity)
954            .map(map)
955            .filter(filter)
956            .project(project)
957    }
958}
959
960// Optimization routines.
961impl MapFilterProject {
962    /// Optimize the internal expression evaluation order.
963    ///
964    /// This method performs several optimizations that are meant to streamline
965    /// the execution of the `MapFilterProject` instance, but not to alter its
966    /// semantics. This includes extracting expressions that are used multiple
967    /// times, inlining those that are not, and removing expressions that are
968    /// unreferenced.
969    ///
970    /// This method will inline all temporal expressions, and remove any columns
971    /// that are not demanded by the output, which should transform any temporal
972    /// filters to a state where the temporal expressions exist only in the list
973    /// of predicates.
974    ///
975    /// # Example
976    ///
977    /// This example demonstrates how the re-use of one expression, converting
978    /// column 1 from a string to an integer, can be extracted and the results
979    /// shared among the two uses. This example is used for each of the steps
980    /// along the optimization path.
981    ///
982    /// ```rust
983    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
984    /// // Demonstrate extraction of common expressions (here: parsing strings).
985    /// let mut map_filter_project = MapFilterProject::new(5)
986    ///     .map(vec![
987    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
988    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
989    ///     ])
990    ///     .project(vec![3,4,5,6]);
991    ///
992    /// let mut expected_optimized = MapFilterProject::new(5)
993    ///     .map(vec![
994    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
995    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
996    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
997    ///     ])
998    ///     .project(vec![3,4,6,7]);
999    ///
1000    /// // Optimize the expression.
1001    /// map_filter_project.optimize();
1002    ///
1003    /// assert_eq!(
1004    ///     map_filter_project,
1005    ///     expected_optimized,
1006    /// );
1007    /// ```
1008    pub fn optimize(&mut self) {
1009        // Track sizes and iterate as long as they decrease.
1010        let mut prev_size = None;
1011        let mut self_size = usize::max_value();
1012        // Continue as long as strict improvements occur.
1013        while prev_size.map(|p| self_size < p).unwrap_or(true) {
1014            // Lock in current size.
1015            prev_size = Some(self_size);
1016
1017            // We have an annoying pattern of mapping literals that already exist as columns (by filters).
1018            // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
1019            // and then replace the mapped expression by a column reference.
1020            //
1021            // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
1022            // the first place. The tell-tale that we see when we fix is a diff that look likes
1023            //
1024            // - Project (#0, #2)
1025            // -   Filter (#1 = 1)
1026            // -     Map (1)
1027            // -       Get l0
1028            // + Filter (#1 = 1)
1029            // +   Get l0
1030            //
1031            for (index, expr) in self.expressions.iter_mut().enumerate() {
1032                // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
1033                for (_, predicate) in self.predicates.iter() {
1034                    if let MirScalarExpr::CallBinary {
1035                        func: crate::BinaryFunc::Eq,
1036                        expr1,
1037                        expr2,
1038                    } = predicate
1039                    {
1040                        if let MirScalarExpr::Column(c) = &**expr1 {
1041                            if *c < index + self.input_arity && &**expr2 == expr {
1042                                *expr = MirScalarExpr::Column(*c);
1043                            }
1044                        }
1045                        if let MirScalarExpr::Column(c) = &**expr2 {
1046                            if *c < index + self.input_arity && &**expr1 == expr {
1047                                *expr = MirScalarExpr::Column(*c);
1048                            }
1049                        }
1050                    }
1051                }
1052            }
1053
1054            // Optimization memoizes individual `ScalarExpr` expressions that
1055            // are sure to be evaluated, canonicalizes references to the first
1056            // occurrence of each, inlines expressions that have a reference
1057            // count of one, and then removes any expressions that are not
1058            // referenced.
1059            self.memoize_expressions();
1060            self.predicates.sort();
1061            self.predicates.dedup();
1062            self.inline_expressions();
1063            self.remove_undemanded();
1064
1065            // Re-build `self` from parts to restore evaluation order invariants.
1066            let (map, filter, project) = self.as_map_filter_project();
1067            *self = Self::new(self.input_arity)
1068                .map(map)
1069                .filter(filter)
1070                .project(project);
1071
1072            self_size = self.size();
1073        }
1074    }
1075
1076    /// Total expression sizes across all expressions.
1077    pub fn size(&self) -> usize {
1078        self.expressions.iter().map(|e| e.size()).sum::<usize>()
1079            + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1080    }
1081
1082    /// Place each certainly evaluated expression in its own column.
1083    ///
1084    /// This method places each non-trivial, certainly evaluated expression
1085    /// in its own column, and deduplicates them so that all references to
1086    /// the same expression reference the same column.
1087    ///
1088    /// This transformation is restricted to expressions we are certain will
1089    /// be evaluated, which does not include expressions in `if` statements.
1090    ///
1091    /// # Example
1092    ///
1093    /// This example demonstrates how memoization notices `MirScalarExpr`s
1094    /// that are used multiple times, and ensures that each are extracted
1095    /// into columns and then referenced by column. This pass does not try
1096    /// to minimize the occurrences of column references, which will happen
1097    /// in inlining.
1098    ///
1099    /// ```rust
1100    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1101    /// // Demonstrate extraction of common expressions (here: parsing strings).
1102    /// let mut map_filter_project = MapFilterProject::new(5)
1103    ///     .map(vec![
1104    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1105    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1106    ///     ])
1107    ///     .project(vec![3,4,5,6]);
1108    ///
1109    /// let mut expected_optimized = MapFilterProject::new(5)
1110    ///     .map(vec![
1111    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1112    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1113    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1114    ///         MirScalarExpr::column(7),
1115    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1116    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
1117    ///         MirScalarExpr::column(10),
1118    ///     ])
1119    ///     .project(vec![3,4,8,11]);
1120    ///
1121    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1122    /// map_filter_project.memoize_expressions();
1123    ///
1124    /// assert_eq!(
1125    ///     map_filter_project,
1126    ///     expected_optimized,
1127    /// );
1128    /// ```
1129    ///
1130    /// Expressions may not be memoized if they are not certain to be evaluated,
1131    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1132    ///
1133    /// ```rust
1134    /// use mz_expr::{MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1135    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1136    /// // the non-extraction of common expressions guarded by conditions.
1137    /// let mut map_filter_project = MapFilterProject::new(2)
1138    ///     .map(vec![
1139    ///         MirScalarExpr::If {
1140    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
1141    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1142    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1143    ///         },
1144    ///         MirScalarExpr::If {
1145    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
1146    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1147    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1148    ///         },
1149    ///     ]);
1150    ///
1151    /// let mut expected_optimized = MapFilterProject::new(2)
1152    ///     .map(vec![
1153    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt),
1154    ///         MirScalarExpr::If {
1155    ///             cond: Box::new(MirScalarExpr::column(2)),
1156    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1157    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1158    ///         },
1159    ///         MirScalarExpr::column(3),
1160    ///         MirScalarExpr::If {
1161    ///             cond: Box::new(MirScalarExpr::column(2)),
1162    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1163    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1164    ///         },
1165    ///         MirScalarExpr::column(5),
1166    ///     ])
1167    ///     .project(vec![0,1,4,6]);
1168    ///
1169    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1170    /// map_filter_project.memoize_expressions();
1171    ///
1172    /// assert_eq!(
1173    ///     map_filter_project,
1174    ///     expected_optimized,
1175    /// );
1176    /// ```
1177    pub fn memoize_expressions(&mut self) {
1178        // Record the mapping from starting column references to new column
1179        // references.
1180        let mut remaps = BTreeMap::new();
1181        for index in 0..self.input_arity {
1182            remaps.insert(index, index);
1183        }
1184        let mut new_expressions = Vec::new();
1185
1186        // We follow the same order as for evaluation, to ensure that all
1187        // column references exist in time for their evaluation. We could
1188        // prioritize predicates, but we would need to be careful to chase
1189        // down column references to expressions and memoize those as well.
1190        let mut expression = 0;
1191        for (support, predicate) in self.predicates.iter_mut() {
1192            while self.input_arity + expression < *support {
1193                self.expressions[expression].permute_map(&remaps);
1194                memoize_expr(
1195                    &mut self.expressions[expression],
1196                    &mut new_expressions,
1197                    self.input_arity,
1198                );
1199                remaps.insert(
1200                    self.input_arity + expression,
1201                    self.input_arity + new_expressions.len(),
1202                );
1203                new_expressions.push(self.expressions[expression].clone());
1204                expression += 1;
1205            }
1206            predicate.permute_map(&remaps);
1207            memoize_expr(predicate, &mut new_expressions, self.input_arity);
1208        }
1209        while expression < self.expressions.len() {
1210            self.expressions[expression].permute_map(&remaps);
1211            memoize_expr(
1212                &mut self.expressions[expression],
1213                &mut new_expressions,
1214                self.input_arity,
1215            );
1216            remaps.insert(
1217                self.input_arity + expression,
1218                self.input_arity + new_expressions.len(),
1219            );
1220            new_expressions.push(self.expressions[expression].clone());
1221            expression += 1;
1222        }
1223
1224        self.expressions = new_expressions;
1225        for proj in self.projection.iter_mut() {
1226            *proj = remaps[proj];
1227        }
1228
1229        // Restore predicate order invariants.
1230        for (pos, pred) in self.predicates.iter_mut() {
1231            *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1232        }
1233    }
1234
1235    /// This method inlines expressions with a single use.
1236    ///
1237    /// This method only inlines expressions; it does not delete expressions
1238    /// that are no longer referenced. The `remove_undemanded()` method does
1239    /// that, and should likely be used after this method.
1240    ///
1241    /// Inlining replaces column references when the referred-to item is either
1242    /// another column reference, or the only referrer of its referent. This
1243    /// is most common after memoization has atomized all expressions to seek
1244    /// out re-use: inlining re-assembles expressions that were not helpfully
1245    /// shared with other expressions.
1246    ///
1247    /// # Example
1248    ///
1249    /// In this example, we see that with only a single reference to columns
1250    /// 0 and 2, their parsing can each be inlined. Similarly, column references
1251    /// can be cleaned up among expressions, and in the final projection.
1252    ///
1253    /// Also notice the remaining expressions, which can be cleaned up in a later
1254    /// pass (the `remove_undemanded` method).
1255    ///
1256    /// ```rust
1257    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1258    /// // Use the output from first `memoize_expression` example.
1259    /// let mut map_filter_project = MapFilterProject::new(5)
1260    ///     .map(vec![
1261    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1262    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1263    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1264    ///         MirScalarExpr::column(7),
1265    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1266    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
1267    ///         MirScalarExpr::column(10),
1268    ///     ])
1269    ///     .project(vec![3,4,8,11]);
1270    ///
1271    /// let mut expected_optimized = MapFilterProject::new(5)
1272    ///     .map(vec![
1273    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1274    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1275    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1276    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1277    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1278    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1279    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1280    ///     ])
1281    ///     .project(vec![3,4,8,11]);
1282    ///
1283    /// // Inline expressions that are referenced only once.
1284    /// map_filter_project.inline_expressions();
1285    ///
1286    /// assert_eq!(
1287    ///     map_filter_project,
1288    ///     expected_optimized,
1289    /// );
1290    /// ```
1291    pub fn inline_expressions(&mut self) {
1292        // Local copy of input_arity to avoid borrowing `self` in closures.
1293        let input_arity = self.input_arity;
1294        // Reference counts track the number of places that a reference occurs.
1295        let mut reference_count = vec![0; input_arity + self.expressions.len()];
1296        // Increment reference counts for each use
1297        for expr in self.expressions.iter() {
1298            expr.visit_pre(|e| {
1299                if let MirScalarExpr::Column(i) = e {
1300                    reference_count[*i] += 1;
1301                }
1302            });
1303        }
1304        for (_, pred) in self.predicates.iter() {
1305            pred.visit_pre(|e| {
1306                if let MirScalarExpr::Column(i) = e {
1307                    reference_count[*i] += 1;
1308                }
1309            });
1310        }
1311        for proj in self.projection.iter() {
1312            reference_count[*proj] += 1;
1313        }
1314
1315        // Determine which expressions should be inlined because they reference temporal expressions.
1316        let mut is_temporal = vec![false; input_arity];
1317        for expr in self.expressions.iter() {
1318            // An express may contain a temporal expression, or reference a column containing such.
1319            is_temporal.push(
1320                expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1321            );
1322        }
1323
1324        // Inline only those columns that 1. are expressions not inputs, and
1325        // 2a. are column references or literals or 2b. have a refcount of 1,
1326        // or 2c. reference temporal expressions (which cannot be evaluated).
1327        let mut should_inline = vec![false; reference_count.len()];
1328        for i in (input_arity..reference_count.len()).rev() {
1329            if let MirScalarExpr::Column(c) = self.expressions[i - input_arity] {
1330                should_inline[i] = true;
1331                // The reference count of the referenced column should be
1332                // incremented with the number of references
1333                // `self.expressions[i - input_arity]` has.
1334                // Subtract 1 because `self.expressions[i - input_arity]` is
1335                // itself a reference.
1336                reference_count[c] += reference_count[i] - 1;
1337            } else {
1338                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1339            }
1340        }
1341        // Inline expressions per `should_inline`.
1342        self.perform_inlining(should_inline);
1343        // We can only inline column references in `self.projection`, but we should.
1344        for proj in self.projection.iter_mut() {
1345            if *proj >= self.input_arity {
1346                if let MirScalarExpr::Column(i) = self.expressions[*proj - self.input_arity] {
1347                    *proj = i;
1348                }
1349            }
1350        }
1351    }
1352
1353    /// Inlines those expressions that are indicated by should_inline.
1354    /// See `inline_expressions` for usage.
1355    pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1356        for index in 0..self.expressions.len() {
1357            let (prior, expr) = self.expressions.split_at_mut(index);
1358            #[allow(deprecated)]
1359            expr[0].visit_mut_post_nolimit(&mut |e| {
1360                if let MirScalarExpr::Column(i) = e {
1361                    if should_inline[*i] {
1362                        *e = prior[*i - self.input_arity].clone();
1363                    }
1364                }
1365            });
1366        }
1367        for (_index, pred) in self.predicates.iter_mut() {
1368            let expressions = &self.expressions;
1369            #[allow(deprecated)]
1370            pred.visit_mut_post_nolimit(&mut |e| {
1371                if let MirScalarExpr::Column(i) = e {
1372                    if should_inline[*i] {
1373                        *e = expressions[*i - self.input_arity].clone();
1374                    }
1375                }
1376            });
1377        }
1378    }
1379
1380    /// Removes unused expressions from `self.expressions`.
1381    ///
1382    /// Expressions are "used" if they are relied upon by any output columns
1383    /// or any predicates, even transitively. Any expressions that are not
1384    /// relied upon in this way can be discarded.
1385    ///
1386    /// # Example
1387    ///
1388    /// ```rust
1389    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1390    /// // Use the output from `inline_expression` example.
1391    /// let mut map_filter_project = MapFilterProject::new(5)
1392    ///     .map(vec![
1393    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1394    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1395    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1396    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1397    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1398    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1399    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1400    ///     ])
1401    ///     .project(vec![3,4,8,11]);
1402    ///
1403    /// let mut expected_optimized = MapFilterProject::new(5)
1404    ///     .map(vec![
1405    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1406    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
1407    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1408    ///     ])
1409    ///     .project(vec![3,4,6,7]);
1410    ///
1411    /// // Remove undemanded expressions, streamlining the work done..
1412    /// map_filter_project.remove_undemanded();
1413    ///
1414    /// assert_eq!(
1415    ///     map_filter_project,
1416    ///     expected_optimized,
1417    /// );
1418    /// ```
1419    pub fn remove_undemanded(&mut self) {
1420        // Determine the demanded expressions to remove irrelevant ones.
1421        let mut demand = BTreeSet::new();
1422        for (_index, pred) in self.predicates.iter() {
1423            demand.extend(pred.support());
1424        }
1425        // Start from the output columns as presumed demanded.
1426        // If this is not the case, the caller should project some away.
1427        demand.extend(self.projection.iter().cloned());
1428        // Proceed in *reverse* order, as expressions may depend on other
1429        // expressions that precede them.
1430        for index in (0..self.expressions.len()).rev() {
1431            if demand.contains(&(self.input_arity + index)) {
1432                demand.extend(self.expressions[index].support());
1433            }
1434        }
1435
1436        // Maintain a map from initial column identifiers to locations
1437        // once we have removed undemanded expressions.
1438        let mut remap = BTreeMap::new();
1439        // This map only needs to map elements of `demand` to a new location,
1440        // but the logic is easier if we include all input columns (as the
1441        // new position is then determined by the size of the map).
1442        for index in 0..self.input_arity {
1443            remap.insert(index, index);
1444        }
1445        // Retain demanded expressions, and record their new locations.
1446        let mut new_expressions = Vec::new();
1447        for (index, expr) in self.expressions.drain(..).enumerate() {
1448            if demand.contains(&(index + self.input_arity)) {
1449                remap.insert(index + self.input_arity, remap.len());
1450                new_expressions.push(expr);
1451            }
1452        }
1453        self.expressions = new_expressions;
1454
1455        // Update column identifiers; rebuild `Self` to re-establish any invariants.
1456        // We mirror `self.permute(&remap)` but we specifically want to remap columns
1457        // that are produced by `self.expressions` after the input columns.
1458        let (expressions, predicates, projection) = self.as_map_filter_project();
1459        *self = Self::new(self.input_arity)
1460            .map(expressions.into_iter().map(|mut e| {
1461                e.permute_map(&remap);
1462                e
1463            }))
1464            .filter(predicates.into_iter().map(|mut p| {
1465                p.permute_map(&remap);
1466                p
1467            }))
1468            .project(projection.into_iter().map(|c| remap[&c]));
1469    }
1470}
1471
1472// TODO: move this elsewhere?
1473/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1474///
1475/// A part of `expr` that is memoized is replaced by a reference to column
1476/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1477/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1478/// refers to.
1479pub fn memoize_expr(
1480    expr: &mut MirScalarExpr,
1481    memoized_parts: &mut Vec<MirScalarExpr>,
1482    input_arity: usize,
1483) {
1484    #[allow(deprecated)]
1485    expr.visit_mut_pre_post_nolimit(
1486        &mut |e| {
1487            // We should not eagerly memoize `if` branches that might not be taken.
1488            // TODO: Memoize expressions in the intersection of `then` and `els`.
1489            if let MirScalarExpr::If { cond, .. } = e {
1490                return Some(vec![cond]);
1491            }
1492            None
1493        },
1494        &mut |e| {
1495            match e {
1496                MirScalarExpr::Literal(_, _) => {
1497                    // Literals do not need to be memoized.
1498                }
1499                MirScalarExpr::Column(col) => {
1500                    // Column references do not need to be memoized, but may need to be
1501                    // updated if they reference a column reference themselves.
1502                    if *col > input_arity {
1503                        if let MirScalarExpr::Column(col2) = memoized_parts[*col - input_arity] {
1504                            *col = col2;
1505                        }
1506                    }
1507                }
1508                _ => {
1509                    if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1510                        // Any complex expression that already exists as a prior column can
1511                        // be replaced by a reference to that column.
1512                        *e = MirScalarExpr::Column(input_arity + position);
1513                    } else {
1514                        // A complex expression that does not exist should be memoized, and
1515                        // replaced by a reference to the column.
1516                        memoized_parts.push(std::mem::replace(
1517                            e,
1518                            MirScalarExpr::Column(input_arity + memoized_parts.len()),
1519                        ));
1520                    }
1521                }
1522            }
1523        },
1524    )
1525}
1526
1527pub mod util {
1528    use std::collections::BTreeMap;
1529
1530    use crate::MirScalarExpr;
1531
1532    #[allow(dead_code)]
1533    /// A triple of actions that map from rows to (key, val) pairs and back again.
1534    struct KeyValRowMapping {
1535        /// Expressions to apply to a row to produce key datums.
1536        to_key: Vec<MirScalarExpr>,
1537        /// Columns to project from a row to produce residual value datums.
1538        to_val: Vec<usize>,
1539        /// Columns to project from the concatenation of key and value to reconstruct the row.
1540        to_row: Vec<usize>,
1541    }
1542
1543    /// Derive supporting logic to support transforming rows to (key, val) pairs,
1544    /// and back again.
1545    ///
1546    /// We are given as input a list of key expressions and an input arity, and the
1547    /// requirement the produced key should be the application of the key expressions.
1548    /// To produce the `val` output, we will identify those input columns not found in
1549    /// the key expressions, and name all other columns.
1550    /// To reconstitute the original row, we identify the sequence of columns from the
1551    /// concatenation of key and val which would reconstruct the original row.
1552    ///
1553    /// The output is a pair of column sequences, the first used to reconstruct a row
1554    /// from the concatenation of key and value, and the second to identify the columns
1555    /// of a row that should become the value associated with its key.
1556    ///
1557    /// The permutations and thinning expressions generated here will be tracked in
1558    /// `dataflow::plan::AvailableCollections`; see the
1559    /// documentation there for more details.
1560    pub fn permutation_for_arrangement(
1561        key: &[MirScalarExpr],
1562        unthinned_arity: usize,
1563    ) -> (Vec<usize>, Vec<usize>) {
1564        let columns_in_key: BTreeMap<_, _> = key
1565            .iter()
1566            .enumerate()
1567            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1568            .collect();
1569        let mut input_cursor = key.len();
1570        let permutation = (0..unthinned_arity)
1571            .map(|c| {
1572                if let Some(c) = columns_in_key.get(&c) {
1573                    // Column is in key (and thus gone from the value
1574                    // of the thinned representation)
1575                    *c
1576                } else {
1577                    // Column remains in value of the thinned representation
1578                    input_cursor += 1;
1579                    input_cursor - 1
1580                }
1581            })
1582            .collect();
1583        let thinning = (0..unthinned_arity)
1584            .filter(|c| !columns_in_key.contains_key(c))
1585            .collect();
1586        (permutation, thinning)
1587    }
1588
1589    /// Given the permutations (see [`permutation_for_arrangement`] and
1590    /// (`dataflow::plan::AvailableCollections`) corresponding to two
1591    /// collections with the same key arity,
1592    /// computes the permutation for the result of joining them.
1593    pub fn join_permutations(
1594        key_arity: usize,
1595        stream_permutation: Vec<usize>,
1596        thinned_stream_arity: usize,
1597        lookup_permutation: Vec<usize>,
1598    ) -> BTreeMap<usize, usize> {
1599        let stream_arity = stream_permutation.len();
1600        let lookup_arity = lookup_permutation.len();
1601
1602        (0..stream_arity + lookup_arity)
1603            .map(|i| {
1604                let location = if i < stream_arity {
1605                    stream_permutation[i]
1606                } else {
1607                    let location_in_lookup = lookup_permutation[i - stream_arity];
1608                    if location_in_lookup < key_arity {
1609                        location_in_lookup
1610                    } else {
1611                        location_in_lookup + thinned_stream_arity
1612                    }
1613                };
1614                (i, location)
1615            })
1616            .collect()
1617    }
1618}
1619
1620pub mod plan {
1621    use std::iter;
1622
1623    use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
1624    use mz_repr::{Datum, Diff, Row, RowArena};
1625    use proptest::prelude::*;
1626    use proptest_derive::Arbitrary;
1627    use serde::{Deserialize, Serialize};
1628
1629    use crate::{
1630        BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, ProtoMfpPlan, ProtoSafeMfpPlan,
1631        UnaryFunc, UnmaterializableFunc, func,
1632    };
1633
1634    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1635    #[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1636    pub struct SafeMfpPlan {
1637        pub(crate) mfp: MapFilterProject,
1638    }
1639
1640    impl RustType<ProtoSafeMfpPlan> for SafeMfpPlan {
1641        fn into_proto(&self) -> ProtoSafeMfpPlan {
1642            ProtoSafeMfpPlan {
1643                mfp: Some(self.mfp.into_proto()),
1644            }
1645        }
1646
1647        fn from_proto(proto: ProtoSafeMfpPlan) -> Result<Self, TryFromProtoError> {
1648            Ok(SafeMfpPlan {
1649                mfp: proto.mfp.into_rust_if_some("ProtoSafeMfpPlan::mfp")?,
1650            })
1651        }
1652    }
1653
1654    impl SafeMfpPlan {
1655        /// Remaps references to input columns according to `remap`.
1656        ///
1657        /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1658        pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1659        where
1660            F: Fn(usize) -> usize,
1661        {
1662            self.mfp.permute_fn(remap, new_arity);
1663        }
1664        /// Evaluates the linear operator on a supplied list of datums.
1665        ///
1666        /// The arguments are the initial datums associated with the row,
1667        /// and an appropriately lifetimed arena for temporary allocations
1668        /// needed by scalar evaluation.
1669        ///
1670        /// An `Ok` result will either be `None` if any predicate did not
1671        /// evaluate to `Datum::True`, or the values of the columns listed
1672        /// by `self.projection` if all predicates passed. If an error
1673        /// occurs in the evaluation it is returned as an `Err` variant.
1674        /// As the evaluation exits early with failed predicates, it may
1675        /// miss some errors that would occur later in evaluation.
1676        ///
1677        /// The `row` is not cleared first, but emptied if the function
1678        /// returns `Ok(Some(row)).
1679        #[inline(always)]
1680        pub fn evaluate_into<'a, 'row>(
1681            &'a self,
1682            datums: &mut Vec<Datum<'a>>,
1683            arena: &'a RowArena,
1684            row_buf: &'row mut Row,
1685        ) -> Result<Option<&'row Row>, EvalError> {
1686            let passed_predicates = self.evaluate_inner(datums, arena)?;
1687            if !passed_predicates {
1688                Ok(None)
1689            } else {
1690                row_buf
1691                    .packer()
1692                    .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1693                Ok(Some(row_buf))
1694            }
1695        }
1696
1697        /// A version of `evaluate` which produces an iterator over `Datum`
1698        /// as output.
1699        ///
1700        /// This version can be useful when one wants to capture the resulting
1701        /// datums without packing and then unpacking a row.
1702        #[inline(always)]
1703        pub fn evaluate_iter<'b, 'a: 'b>(
1704            &'a self,
1705            datums: &'b mut Vec<Datum<'a>>,
1706            arena: &'a RowArena,
1707        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1708            let passed_predicates = self.evaluate_inner(datums, arena)?;
1709            if !passed_predicates {
1710                Ok(None)
1711            } else {
1712                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1713            }
1714        }
1715
1716        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1717        ///
1718        /// This does not apply `self.projection`, which is up to the calling method.
1719        pub fn evaluate_inner<'b, 'a: 'b>(
1720            &'a self,
1721            datums: &'b mut Vec<Datum<'a>>,
1722            arena: &'a RowArena,
1723        ) -> Result<bool, EvalError> {
1724            let mut expression = 0;
1725            for (support, predicate) in self.mfp.predicates.iter() {
1726                while self.mfp.input_arity + expression < *support {
1727                    datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1728                    expression += 1;
1729                }
1730                if predicate.eval(&datums[..], arena)? != Datum::True {
1731                    return Ok(false);
1732                }
1733            }
1734            while expression < self.mfp.expressions.len() {
1735                datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1736                expression += 1;
1737            }
1738            Ok(true)
1739        }
1740
1741        /// Returns true if evaluation could introduce an error on non-error inputs.
1742        pub fn could_error(&self) -> bool {
1743            self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1744                || self.mfp.expressions.iter().any(|e| e.could_error())
1745        }
1746
1747        /// Returns true when `Self` is the identity.
1748        pub fn is_identity(&self) -> bool {
1749            self.mfp.is_identity()
1750        }
1751    }
1752
1753    impl std::ops::Deref for SafeMfpPlan {
1754        type Target = MapFilterProject;
1755        fn deref(&self) -> &Self::Target {
1756            &self.mfp
1757        }
1758    }
1759
1760    /// Predicates partitioned into temporal and non-temporal.
1761    ///
1762    /// Temporal predicates require some recognition to determine their
1763    /// structure, and it is best to do that once and re-use the results.
1764    ///
1765    /// There are restrictions on the temporal predicates we currently support.
1766    /// They must directly constrain `MzNow` from below or above,
1767    /// by expressions that do not themselves contain `MzNow`.
1768    /// Conjunctions of such constraints are also ok.
1769    #[derive(Arbitrary, Clone, Debug, PartialEq)]
1770    pub struct MfpPlan {
1771        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1772        pub(crate) mfp: SafeMfpPlan,
1773        /// Expressions that when evaluated lower-bound `MzNow`.
1774        #[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
1775        pub(crate) lower_bounds: Vec<MirScalarExpr>,
1776        /// Expressions that when evaluated upper-bound `MzNow`.
1777        #[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
1778        pub(crate) upper_bounds: Vec<MirScalarExpr>,
1779    }
1780
1781    impl RustType<ProtoMfpPlan> for MfpPlan {
1782        fn into_proto(&self) -> ProtoMfpPlan {
1783            ProtoMfpPlan {
1784                mfp: Some(self.mfp.into_proto()),
1785                lower_bounds: self.lower_bounds.into_proto(),
1786                upper_bounds: self.upper_bounds.into_proto(),
1787            }
1788        }
1789
1790        fn from_proto(proto: ProtoMfpPlan) -> Result<Self, TryFromProtoError> {
1791            Ok(MfpPlan {
1792                mfp: proto.mfp.into_rust_if_some("ProtoMfpPlan::mfp")?,
1793                lower_bounds: proto.lower_bounds.into_rust()?,
1794                upper_bounds: proto.upper_bounds.into_rust()?,
1795            })
1796        }
1797    }
1798
1799    impl MfpPlan {
1800        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1801        ///
1802        /// The first returned list is of predicates that do not contain `mz_now`.
1803        /// The second and third returned lists contain expressions that, once evaluated, lower
1804        /// and upper bound the validity interval of a record, respectively. These second two
1805        /// lists are populated only by binary expressions of the form
1806        /// ```ignore
1807        /// mz_now cmp_op expr
1808        /// ```
1809        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1810        ///
1811        /// If any unsupported expression is found, for example one that uses `mz_now`
1812        /// in an unsupported position, an error is returned.
1813        pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1814            let mut lower_bounds = Vec::new();
1815            let mut upper_bounds = Vec::new();
1816
1817            let mut temporal = Vec::new();
1818
1819            // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1820            mfp.optimize();
1821
1822            mfp.predicates.retain(|(_position, predicate)| {
1823                if predicate.contains_temporal() {
1824                    temporal.push(predicate.clone());
1825                    false
1826                } else {
1827                    true
1828                }
1829            });
1830
1831            for predicate in temporal.into_iter() {
1832                // Supported temporal predicates are exclusively binary operators.
1833                if let MirScalarExpr::CallBinary {
1834                    mut func,
1835                    mut expr1,
1836                    mut expr2,
1837                } = predicate
1838                {
1839                    // Attempt to put `LogicalTimestamp` in the first argument position.
1840                    if !expr1.contains_temporal()
1841                        && *expr2
1842                            == MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
1843                    {
1844                        std::mem::swap(&mut expr1, &mut expr2);
1845                        func = match func {
1846                            BinaryFunc::Eq => BinaryFunc::Eq,
1847                            BinaryFunc::Lt => BinaryFunc::Gt,
1848                            BinaryFunc::Lte => BinaryFunc::Gte,
1849                            BinaryFunc::Gt => BinaryFunc::Lt,
1850                            BinaryFunc::Gte => BinaryFunc::Lte,
1851                            x => {
1852                                return Err(format!(
1853                                    "Unsupported binary temporal operation: {:?}",
1854                                    x
1855                                ));
1856                            }
1857                        };
1858                    }
1859
1860                    // Error if MLT is referenced in an unsupported position.
1861                    if expr2.contains_temporal()
1862                        || *expr1
1863                            != MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
1864                    {
1865                        return Err(format!(
1866                            "Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a mz_timestamp-castable expression. Expression found: {}",
1867                            MirScalarExpr::CallBinary { func, expr1, expr2 },
1868                        ));
1869                    }
1870
1871                    // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1872                    match func {
1873                        BinaryFunc::Eq => {
1874                            lower_bounds.push(*expr2.clone());
1875                            upper_bounds.push(
1876                                expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1877                            );
1878                        }
1879                        BinaryFunc::Lt => {
1880                            upper_bounds.push(*expr2.clone());
1881                        }
1882                        BinaryFunc::Lte => {
1883                            upper_bounds.push(
1884                                expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1885                            );
1886                        }
1887                        BinaryFunc::Gt => {
1888                            lower_bounds.push(
1889                                expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1890                            );
1891                        }
1892                        BinaryFunc::Gte => {
1893                            lower_bounds.push(*expr2.clone());
1894                        }
1895                        _ => {
1896                            return Err(format!(
1897                                "Unsupported binary temporal operation: {:?}",
1898                                func
1899                            ));
1900                        }
1901                    }
1902                } else {
1903                    return Err(format!(
1904                        "Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a non-temporal expression of mz_timestamp-castable type. Expression found: {}",
1905                        predicate,
1906                    ));
1907                }
1908            }
1909
1910            Ok(Self {
1911                mfp: SafeMfpPlan { mfp },
1912                lower_bounds,
1913                upper_bounds,
1914            })
1915        }
1916
1917        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1918        pub fn is_identity(&self) -> bool {
1919            self.mfp.mfp.is_identity()
1920                && self.lower_bounds.is_empty()
1921                && self.upper_bounds.is_empty()
1922        }
1923
1924        /// Returns `self`, and leaves behind an identity operator that acts on its output.
1925        pub fn take(&mut self) -> Self {
1926            let mut identity = Self {
1927                mfp: SafeMfpPlan {
1928                    mfp: MapFilterProject::new(self.mfp.projection.len()),
1929                },
1930                lower_bounds: Default::default(),
1931                upper_bounds: Default::default(),
1932            };
1933            std::mem::swap(self, &mut identity);
1934            identity
1935        }
1936
1937        /// Attempt to convert self into a non-temporal MapFilterProject plan.
1938        ///
1939        /// If that is not possible, the original instance is returned as an error.
1940        #[allow(clippy::result_large_err)]
1941        pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1942            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1943                Ok(self.mfp)
1944            } else {
1945                Err(self)
1946            }
1947        }
1948
1949        /// Returns an iterator over mutable references to all non-temporal
1950        /// scalar expressions in the plan.
1951        ///
1952        /// The order of iteration is unspecified.
1953        pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1954            iter::empty()
1955                .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1956                .chain(&mut self.mfp.mfp.expressions)
1957                .chain(&mut self.lower_bounds)
1958                .chain(&mut self.upper_bounds)
1959        }
1960
1961        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1962        ///
1963        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1964        /// or an evaluation error. If `self contains temporal predicates, the results can be times
1965        /// that are greater than the input `time`, and may contain negated `diff` values.
1966        ///
1967        /// The `row_builder` is not cleared first, but emptied if the function
1968        /// returns an iterator with any `Ok(_)` element.
1969        pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1970            &'a self,
1971            datums: &'b mut Vec<Datum<'a>>,
1972            arena: &'a RowArena,
1973            time: mz_repr::Timestamp,
1974            diff: Diff,
1975            valid_time: V,
1976            row_builder: &mut Row,
1977        ) -> impl Iterator<
1978            Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1979        > + use<E, V> {
1980            match self.mfp.evaluate_inner(datums, arena) {
1981                Err(e) => {
1982                    return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1983                }
1984                Ok(true) => {}
1985                Ok(false) => {
1986                    return None.into_iter().chain(None);
1987                }
1988            }
1989
1990            // Lower and upper bounds.
1991            let mut lower_bound = time;
1992            let mut upper_bound = None;
1993
1994            // Track whether we have seen a null in either bound, as this should
1995            // prevent the record from being produced at any time.
1996            let mut null_eval = false;
1997
1998            // Advance our lower bound to be at least the result of any lower bound
1999            // expressions.
2000            for l in self.lower_bounds.iter() {
2001                match l.eval(datums, arena) {
2002                    Err(e) => {
2003                        return Some(Err((e.into(), time, diff)))
2004                            .into_iter()
2005                            .chain(None.into_iter());
2006                    }
2007                    Ok(Datum::MzTimestamp(d)) => {
2008                        lower_bound = lower_bound.max(d);
2009                    }
2010                    Ok(Datum::Null) => {
2011                        null_eval = true;
2012                    }
2013                    x => {
2014                        panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
2015                    }
2016                }
2017            }
2018
2019            // If the lower bound exceeds our `until` frontier, it should not appear in the output.
2020            if !valid_time(&lower_bound) {
2021                return None.into_iter().chain(None);
2022            }
2023
2024            // If there are any upper bounds, determine the minimum upper bound.
2025            for u in self.upper_bounds.iter() {
2026                // We can cease as soon as the lower and upper bounds match,
2027                // as the update will certainly not be produced in that case.
2028                if upper_bound != Some(lower_bound) {
2029                    match u.eval(datums, arena) {
2030                        Err(e) => {
2031                            return Some(Err((e.into(), time, diff)))
2032                                .into_iter()
2033                                .chain(None.into_iter());
2034                        }
2035                        Ok(Datum::MzTimestamp(d)) => {
2036                            if let Some(upper) = upper_bound {
2037                                upper_bound = Some(upper.min(d));
2038                            } else {
2039                                upper_bound = Some(d);
2040                            };
2041                            // Force the upper bound to be at least the lower
2042                            // bound. The `is_some()` test should always be true
2043                            // due to the above block, but maintain it here in
2044                            // case that changes. It's hopefully optimized away.
2045                            if upper_bound.is_some() && upper_bound < Some(lower_bound) {
2046                                upper_bound = Some(lower_bound);
2047                            }
2048                        }
2049                        Ok(Datum::Null) => {
2050                            null_eval = true;
2051                        }
2052                        x => {
2053                            panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
2054                        }
2055                    }
2056                }
2057            }
2058
2059            // If the upper bound exceeds our `until` frontier, it should not appear in the output.
2060            if let Some(upper) = &mut upper_bound {
2061                if !valid_time(upper) {
2062                    upper_bound = None;
2063                }
2064            }
2065
2066            // Produce an output only if the upper bound exceeds the lower bound,
2067            // and if we did not encounter a `null` in our evaluation.
2068            if Some(lower_bound) != upper_bound && !null_eval {
2069                row_builder
2070                    .packer()
2071                    .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
2072                let upper_opt =
2073                    upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
2074                let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
2075                lower.into_iter().chain(upper_opt)
2076            } else {
2077                None.into_iter().chain(None)
2078            }
2079        }
2080
2081        /// Returns true if evaluation could introduce an error on non-error inputs.
2082        pub fn could_error(&self) -> bool {
2083            self.mfp.could_error()
2084                || self.lower_bounds.iter().any(|e| e.could_error())
2085                || self.upper_bounds.iter().any(|e| e.could_error())
2086        }
2087
2088        /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
2089        ///
2090        /// At the moment, this is only true if it projects away all columns and applies no filters,
2091        /// but it could be extended to plans that produce literals independent of the input.
2092        pub fn ignores_input(&self) -> bool {
2093            self.lower_bounds.is_empty()
2094                && self.upper_bounds.is_empty()
2095                && self.mfp.mfp.projection.is_empty()
2096                && self.mfp.mfp.predicates.is_empty()
2097        }
2098    }
2099}
2100
2101#[cfg(test)]
2102mod tests {
2103    use mz_ore::assert_ok;
2104    use mz_proto::protobuf_roundtrip;
2105
2106    use crate::linear::plan::*;
2107
2108    use super::*;
2109
2110    proptest! {
2111        #![proptest_config(ProptestConfig::with_cases(32))]
2112
2113        #[mz_ore::test]
2114        #[cfg_attr(miri, ignore)] // too slow
2115        fn mfp_plan_protobuf_roundtrip(expect in any::<MfpPlan>()) {
2116            let actual = protobuf_roundtrip::<_, ProtoMfpPlan>(&expect);
2117            assert_ok!(actual);
2118            assert_eq!(actual.unwrap(), expect);
2119        }
2120    }
2121}