mz_expr/
linear.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::visit::Visit;
16use crate::{MirRelationExpr, MirScalarExpr};
17
18/// A compound operator that can be applied row-by-row.
19///
20/// This operator integrates the map, filter, and project operators.
21/// It applies a sequences of map expressions, which are allowed to
22/// refer to previous expressions, interleaved with predicates which
23/// must be satisfied for an output to be produced. If all predicates
24/// evaluate to `Datum::True` the data at the identified columns are
25/// collected and produced as output in a packed `Row`.
26///
27/// This operator is a "builder" and its contents may contain expressions
28/// that are not yet executable. For example, it may contain temporal
29/// expressions in `self.expressions`, even though this is not something
30/// we can directly evaluate. The plan creation methods will defensively
31/// ensure that the right thing happens.
32#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd)]
33pub struct MapFilterProject {
34    /// A sequence of expressions that should be appended to the row.
35    ///
36    /// Many of these expressions may not be produced in the output,
37    /// and may only be present as common subexpressions.
38    pub expressions: Vec<MirScalarExpr>,
39    /// Expressions that must evaluate to `Datum::True` for the output
40    /// row to be produced.
41    ///
42    /// Each entry is prepended with a column identifier indicating
43    /// the column *before* which the predicate should first be applied.
44    /// Most commonly this would be one plus the largest column identifier
45    /// in the predicate's support, but it could be larger to implement
46    /// guarded evaluation of predicates.
47    ///
48    /// This list should be sorted by the first field.
49    pub predicates: Vec<(usize, MirScalarExpr)>,
50    /// A sequence of column identifiers whose data form the output row.
51    pub projection: Vec<usize>,
52    /// The expected number of input columns.
53    ///
54    /// This is needed to ensure correct identification of newly formed
55    /// columns in the output.
56    pub input_arity: usize,
57}
58
59impl Display for MapFilterProject {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        writeln!(f, "MapFilterProject(")?;
62        writeln!(f, "  expressions:")?;
63        self.expressions
64            .iter()
65            .enumerate()
66            .try_for_each(|(i, e)| writeln!(f, "    #{} <- {},", i + self.input_arity, e))?;
67        writeln!(f, "  predicates:")?;
68        self.predicates
69            .iter()
70            .try_for_each(|(before, p)| writeln!(f, "    <before: {}> {},", before, p))?;
71        writeln!(f, "  projection: {:?}", self.projection)?;
72        writeln!(f, "  input_arity: {}", self.input_arity)?;
73        writeln!(f, ")")
74    }
75}
76
77impl MapFilterProject {
78    /// Create a no-op operator for an input of a supplied arity.
79    pub fn new(input_arity: usize) -> Self {
80        Self {
81            expressions: Vec::new(),
82            predicates: Vec::new(),
83            projection: (0..input_arity).collect(),
84            input_arity,
85        }
86    }
87
88    /// Given two mfps, return an mfp that applies one
89    /// followed by the other.
90    /// Note that the arguments are in the opposite order
91    /// from how function composition is usually written in mathematics.
92    pub fn compose(before: Self, after: Self) -> Self {
93        let (m, f, p) = after.into_map_filter_project();
94        before.map(m).filter(f).project(p)
95    }
96
97    /// True if the operator describes the identity transformation.
98    pub fn is_identity(&self) -> bool {
99        self.expressions.is_empty()
100            && self.predicates.is_empty()
101            && self.projection.len() == self.input_arity
102            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
103    }
104
105    /// Retain only the indicated columns in the presented order.
106    pub fn project<I>(mut self, columns: I) -> Self
107    where
108        I: IntoIterator<Item = usize> + std::fmt::Debug,
109    {
110        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
111        self
112    }
113
114    /// Retain only rows satisfying these predicates.
115    ///
116    /// This method introduces predicates as eagerly as they can be evaluated,
117    /// which may not be desired for predicates that may cause exceptions.
118    /// If fine manipulation is required, the predicates can be added manually.
119    pub fn filter<I>(mut self, predicates: I) -> Self
120    where
121        I: IntoIterator<Item = MirScalarExpr>,
122    {
123        for mut predicate in predicates {
124            // Correct column references.
125            predicate.permute(&self.projection[..]);
126
127            // Validate column references.
128            assert!(
129                predicate
130                    .support()
131                    .into_iter()
132                    .all(|c| c < self.input_arity + self.expressions.len())
133            );
134
135            // Insert predicate as eagerly as it can be evaluated:
136            // just after the largest column in its support is formed.
137            let max_support = predicate
138                .support()
139                .into_iter()
140                .max()
141                .map(|c| c + 1)
142                .unwrap_or(0);
143            self.predicates.push((max_support, predicate))
144        }
145        // Stable sort predicates by position at which they take effect.
146        // We put literal errors at the end as a stop-gap to avoid erroring
147        // before we are able to evaluate any predicates that might prevent it.
148        self.predicates
149            .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
150        self
151    }
152
153    /// Append the result of evaluating expressions to each row.
154    pub fn map<I>(mut self, expressions: I) -> Self
155    where
156        I: IntoIterator<Item = MirScalarExpr>,
157    {
158        for mut expression in expressions {
159            // Correct column references.
160            expression.permute(&self.projection[..]);
161
162            // Validate column references.
163            assert!(
164                expression
165                    .support()
166                    .into_iter()
167                    .all(|c| c < self.input_arity + self.expressions.len())
168            );
169
170            // Introduce expression and produce as output.
171            self.expressions.push(expression);
172            self.projection
173                .push(self.input_arity + self.expressions.len() - 1);
174        }
175
176        self
177    }
178
179    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
180    pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
181        let predicates = self
182            .predicates
183            .into_iter()
184            .map(|(_pos, predicate)| predicate)
185            .collect();
186        (self.expressions, predicates, self.projection)
187    }
188
189    /// As the arguments to `Map`, `Filter`, and `Project` operators.
190    ///
191    /// In principle, this operator can be implemented as a sequence of
192    /// more elemental operators, likely less efficiently.
193    pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
194        self.clone().into_map_filter_project()
195    }
196
197    /// Determines if a scalar expression must be equal to a literal datum.
198    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
199        for (_pos, predicate) in self.predicates.iter() {
200            if let MirScalarExpr::CallBinary {
201                func: crate::BinaryFunc::Eq(_),
202                expr1,
203                expr2,
204            } = predicate
205            {
206                if let Some(Ok(datum1)) = expr1.as_literal() {
207                    if &**expr2 == expr {
208                        return Some(datum1);
209                    }
210                }
211                if let Some(Ok(datum2)) = expr2.as_literal() {
212                    if &**expr1 == expr {
213                        return Some(datum2);
214                    }
215                }
216            }
217        }
218        None
219    }
220
221    /// Determines if a sequence of scalar expressions must be equal to a literal row.
222    ///
223    /// This method returns `None` on an empty `exprs`, which might be surprising, but
224    /// seems to line up with its callers' expectations of that being a non-constraint.
225    /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
226    /// if they would rather have a literal empty row.
227    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
228        if exprs.is_empty() {
229            return None;
230        }
231        let mut row = Row::default();
232        let mut packer = row.packer();
233        for expr in exprs {
234            if let Some(literal) = self.literal_constraint(expr) {
235                packer.push(literal);
236            } else {
237                return None;
238            }
239        }
240        Some(row)
241    }
242
243    /// Extracts any MapFilterProject at the root of the expression.
244    ///
245    /// The expression will be modified to extract any maps, filters, and
246    /// projections, which will be returned as `Self`. If there are no maps,
247    /// filters, or projections the method will return an identity operator.
248    ///
249    /// The extracted expressions may contain temporal predicates, and one
250    /// should be careful to apply them blindly.
251    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
252        // TODO: This could become iterative rather than recursive if
253        // we were able to fuse MFP operators from below, rather than
254        // from above.
255        match expr {
256            MirRelationExpr::Map { input, scalars } => {
257                let (mfp, expr) = Self::extract_from_expression(input);
258                (mfp.map(scalars.iter().cloned()), expr)
259            }
260            MirRelationExpr::Filter { input, predicates } => {
261                let (mfp, expr) = Self::extract_from_expression(input);
262                (mfp.filter(predicates.iter().cloned()), expr)
263            }
264            MirRelationExpr::Project { input, outputs } => {
265                let (mfp, expr) = Self::extract_from_expression(input);
266                (mfp.project(outputs.iter().cloned()), expr)
267            }
268            // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
269            // this call to `arity()`.
270            x => (Self::new(x.arity()), x),
271        }
272    }
273
274    /// Extracts an error-free MapFilterProject at the root of the expression.
275    ///
276    /// The expression will be modified to extract maps, filters, and projects
277    /// from the root of the expression, which will be returned as `Self`. The
278    /// extraction will halt if a Map or Filter containing a literal error is
279    /// reached. Otherwise, the method will return an identity operator.
280    ///
281    /// This method is meant to be used during optimization, where it is
282    /// necessary to avoid moving around maps and filters with errors.
283    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
284        match expr {
285            MirRelationExpr::Map { input, scalars }
286                if scalars.iter().all(|s| !s.is_literal_err()) =>
287            {
288                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
289                (mfp.map(scalars.iter().cloned()), expr)
290            }
291            MirRelationExpr::Filter { input, predicates }
292                if predicates.iter().all(|p| !p.is_literal_err()) =>
293            {
294                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
295                (mfp.filter(predicates.iter().cloned()), expr)
296            }
297            MirRelationExpr::Project { input, outputs } => {
298                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
299                (mfp.project(outputs.iter().cloned()), expr)
300            }
301            x => (Self::new(x.arity()), x),
302        }
303    }
304
305    /// Extracts an error-free MapFilterProject at the root of the expression.
306    ///
307    /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
308    /// mutable reference.
309    pub fn extract_non_errors_from_expr_ref_mut(
310        expr: &mut MirRelationExpr,
311    ) -> (Self, &mut MirRelationExpr) {
312        // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
313        // superfluous outer if, which works around a borrow-checker issue:
314        // https://github.com/rust-lang/rust/issues/54663
315        if matches!(expr, MirRelationExpr::Map { input: _, scalars } if scalars.iter().all(|s| !s.is_literal_err()))
316            || matches!(expr, MirRelationExpr::Filter { input: _, predicates } if predicates.iter().all(|p| !p.is_literal_err()))
317            || matches!(expr, MirRelationExpr::Project { .. })
318        {
319            match expr {
320                MirRelationExpr::Map { input, scalars }
321                    if scalars.iter().all(|s| !s.is_literal_err()) =>
322                {
323                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
324                    (mfp.map(scalars.iter().cloned()), expr)
325                }
326                MirRelationExpr::Filter { input, predicates }
327                    if predicates.iter().all(|p| !p.is_literal_err()) =>
328                {
329                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
330                    (mfp.filter(predicates.iter().cloned()), expr)
331                }
332                MirRelationExpr::Project { input, outputs } => {
333                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
334                    (mfp.project(outputs.iter().cloned()), expr)
335                }
336                _ => unreachable!(),
337            }
338        } else {
339            (Self::new(expr.arity()), expr)
340        }
341    }
342
343    /// Removes an error-free MapFilterProject from the root of the expression.
344    ///
345    /// The expression will be modified to extract maps, filters, and projects
346    /// from the root of the expression, which will be returned as `Self`. The
347    /// extraction will halt if a Map or Filter containing a literal error is
348    /// reached. Otherwise, the method will return an
349    /// identity operator, and the expression will remain unchanged.
350    ///
351    /// This method is meant to be used during optimization, where it is
352    /// necessary to avoid moving around maps and filters with errors.
353    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
354        match expr {
355            MirRelationExpr::Map { input, scalars }
356                if scalars.iter().all(|s| !s.is_literal_err()) =>
357            {
358                let mfp =
359                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
360                *expr = input.take_dangerous();
361                mfp
362            }
363            MirRelationExpr::Filter { input, predicates }
364                if predicates.iter().all(|p| !p.is_literal_err()) =>
365            {
366                let mfp = Self::extract_non_errors_from_expr_mut(input)
367                    .filter(predicates.iter().cloned());
368                *expr = input.take_dangerous();
369                mfp
370            }
371            MirRelationExpr::Project { input, outputs } => {
372                let mfp =
373                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
374                *expr = input.take_dangerous();
375                mfp
376            }
377            x => Self::new(x.arity()),
378        }
379    }
380
381    /// Extracts temporal predicates into their own `Self`.
382    ///
383    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
384    /// though there could be justification for extracting them as well if they are otherwise
385    /// unused.
386    ///
387    /// This separation is valuable when the execution cannot be fused into one operator.
388    pub fn extract_temporal(&mut self) -> Self {
389        // Optimize the expression, as it is only post-optimization that we can be certain
390        // that temporal expressions are restricted to filters. We could relax this in the
391        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
392        // seems to be the best fit at the moment.
393        self.optimize();
394
395        // Assert that we no longer have temporal expressions to evaluate. This should only
396        // occur if the optimization above results with temporal expressions yielded in the
397        // output, which is out of spec for how the type is meant to be used.
398        assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
399
400        // Extract temporal predicates from `self.predicates`.
401        let mut temporal_predicates = Vec::new();
402        self.predicates.retain(|(_position, predicate)| {
403            if predicate.contains_temporal() {
404                temporal_predicates.push(predicate.clone());
405                false
406            } else {
407                true
408            }
409        });
410
411        // Determine extended input columns used by temporal filters.
412        let mut support = BTreeSet::new();
413        for predicate in temporal_predicates.iter() {
414            support.extend(predicate.support());
415        }
416
417        // Discover the locations of these columns after `self.projection`.
418        let old_projection_len = self.projection.len();
419        let mut new_location = BTreeMap::new();
420        for original in support.iter() {
421            if let Some(position) = self.projection.iter().position(|x| x == original) {
422                new_location.insert(*original, position);
423            } else {
424                new_location.insert(*original, self.projection.len());
425                self.projection.push(*original);
426            }
427        }
428        // Permute references in extracted predicates to their new locations.
429        for predicate in temporal_predicates.iter_mut() {
430            predicate.permute_map(&new_location);
431        }
432
433        // Form a new `Self` containing the temporal predicates to return.
434        Self::new(self.projection.len())
435            .filter(temporal_predicates)
436            .project(0..old_projection_len)
437    }
438
439    /// Extracts common expressions from multiple `Self` into a result `Self`.
440    ///
441    /// The argument `mfps` are mutated so that each are functionaly equivalent to their
442    /// corresponding input, when composed atop the resulting `Self`.
443    ///
444    /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
445    pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
446        match mfps.len() {
447            0 => {
448                panic!("Cannot call method on empty arguments");
449            }
450            1 => {
451                let output_arity = mfps[0].projection.len();
452                std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
453            }
454            _ => {
455                // More generally, we convert each mfp to ANF, at which point we can
456                // repeatedly extract atomic expressions that depend only on input
457                // columns, migrate them to an input mfp, and repeat until no such
458                // expressions exist. At this point, we can also migrate predicates
459                // and then determine and push down projections.
460
461                // Prepare a return `Self`.
462                let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
463
464                // We convert each mfp to ANF, using `memoize_expressions`.
465                for mfp in mfps.iter_mut() {
466                    mfp.memoize_expressions();
467                }
468
469                // We repeatedly extract common expressions, until none remain.
470                let mut done = false;
471                while !done {
472                    // We use references to determine common expressions, and must
473                    // introduce a scope here to drop the borrows before mutation.
474                    let common = {
475                        // The input arity may increase as we iterate, so recapture.
476                        let input_arity = result_mfp.projection.len();
477                        let mut prev: BTreeSet<_> = mfps[0]
478                            .expressions
479                            .iter()
480                            .filter(|e| e.support().iter().max() < Some(&input_arity))
481                            .collect();
482                        let mut next = BTreeSet::default();
483                        for mfp in mfps[1..].iter() {
484                            for expr in mfp.expressions.iter() {
485                                if prev.contains(expr) {
486                                    next.insert(expr);
487                                }
488                            }
489                            std::mem::swap(&mut prev, &mut next);
490                            next.clear();
491                        }
492                        prev.into_iter().cloned().collect::<Vec<_>>()
493                    };
494                    // Without new common expressions, we should terminate the loop.
495                    done = common.is_empty();
496
497                    // Migrate each expression in `common` to `result_mfp`.
498                    for expr in common.into_iter() {
499                        // Update each mfp by removing expr and updating column references.
500                        for mfp in mfps.iter_mut() {
501                            // With `expr` next in `result_mfp`, it is as if we are rotating it to
502                            // be the first expression in `mfp`, and then removing it from `mfp` and
503                            // increasing the input arity of `mfp`.
504                            let arity = result_mfp.projection.len();
505                            let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
506                            let index = arity + found;
507                            // Column references change due to the rotation from `index` to `arity`.
508                            let action = |c: &mut usize| {
509                                if arity <= *c && *c < index {
510                                    *c += 1;
511                                } else if *c == index {
512                                    *c = arity;
513                                }
514                            };
515                            // Rotate `expr` from `found` to first, and then snip.
516                            // Short circuit by simply removing and incrementing the input arity.
517                            mfp.input_arity += 1;
518                            mfp.expressions.remove(found);
519                            // Update column references in expressions, predicates, and projections.
520                            for e in mfp.expressions.iter_mut() {
521                                e.visit_columns(action);
522                            }
523                            for (o, e) in mfp.predicates.iter_mut() {
524                                e.visit_columns(action);
525                                // Max out the offset for the predicate; optimization will correct.
526                                *o = mfp.input_arity + mfp.expressions.len();
527                            }
528                            for c in mfp.projection.iter_mut() {
529                                action(c);
530                            }
531                        }
532                        // Install the expression and update
533                        result_mfp.expressions.push(expr);
534                        result_mfp.projection.push(result_mfp.projection.len());
535                    }
536                }
537                // As before, but easier: predicates in common to all mfps.
538                let common_preds: Vec<MirScalarExpr> = {
539                    let input_arity = result_mfp.projection.len();
540                    let mut prev: BTreeSet<_> = mfps[0]
541                        .predicates
542                        .iter()
543                        .map(|(_, e)| e)
544                        .filter(|e| e.support().iter().max() < Some(&input_arity))
545                        .collect();
546                    let mut next = BTreeSet::default();
547                    for mfp in mfps[1..].iter() {
548                        for (_, expr) in mfp.predicates.iter() {
549                            if prev.contains(expr) {
550                                next.insert(expr);
551                            }
552                        }
553                        std::mem::swap(&mut prev, &mut next);
554                        next.clear();
555                    }
556                    // Expressions in common, that we will append to `result_mfp.expressions`.
557                    prev.into_iter().cloned().collect::<Vec<_>>()
558                };
559                for mfp in mfps.iter_mut() {
560                    mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
561                    mfp.optimize();
562                }
563                result_mfp.predicates.extend(
564                    common_preds
565                        .into_iter()
566                        .map(|e| (result_mfp.projection.len(), e)),
567                );
568
569                // Then, look for unused columns and project them away.
570                let mut common_demand = BTreeSet::new();
571                for mfp in mfps.iter() {
572                    common_demand.extend(mfp.demand());
573                }
574                // columns in `common_demand` must be retained, but others
575                // may be discarded.
576                let common_demand = (0..result_mfp.projection.len())
577                    .filter(|x| common_demand.contains(x))
578                    .collect::<Vec<_>>();
579                let remap = common_demand
580                    .iter()
581                    .cloned()
582                    .enumerate()
583                    .map(|(new, old)| (old, new))
584                    .collect::<BTreeMap<_, _>>();
585                for mfp in mfps.iter_mut() {
586                    mfp.permute_fn(|c| remap[&c], common_demand.len());
587                }
588                result_mfp = result_mfp.project(common_demand);
589
590                // Return the resulting MFP.
591                result_mfp.optimize();
592                result_mfp
593            }
594        }
595    }
596
597    /// Returns `self`, and leaves behind an identity operator that acts on its output.
598    pub fn take(&mut self) -> Self {
599        let mut identity = Self::new(self.projection.len());
600        std::mem::swap(self, &mut identity);
601        identity
602    }
603
604    /// Convert the `MapFilterProject` into a staged evaluation plan.
605    ///
606    /// The main behavior is extract temporal predicates, which cannot be evaluated
607    /// using the standard machinery.
608    pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
609        plan::MfpPlan::create_from(self)
610    }
611}
612
613impl MapFilterProject {
614    /// Partitions `self` into two instances, one of which can be eagerly applied.
615    ///
616    /// The `available` argument indicates which input columns are available (keys)
617    /// and in which positions (values). This information may allow some maps and
618    /// filters to execute. The `input_arity` argument reports the total number of
619    /// input columns (which may include some not present in `available`)
620    ///
621    /// This method partitions `self` in two parts, `(before, after)`, where `before`
622    /// can be applied on columns present as keys in `available`, and `after` must
623    /// await the introduction of the other input columns.
624    ///
625    /// The `before` instance will *append* any columns that can be determined from
626    /// `available` but will project away any of these columns that are not needed by
627    /// `after`. Importantly, this means that `before` will leave intact *all* input
628    /// columns including those not referenced in `available`.
629    ///
630    /// The `after` instance will presume all input columns are available, followed
631    /// by the appended columns of the `before` instance. It may be that some input
632    /// columns can be projected away in `before` if `after` does not need them, but
633    /// we leave that as something the caller can apply if needed (it is otherwise
634    /// complicated to negotiate which input columns `before` should retain).
635    ///
636    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
637    /// additional input columns, permute all input columns to their locations as
638    /// expected by `self`, follow this by new columns appended by `before`, and
639    /// remove all other columns that may be present.
640    ///
641    /// # Example
642    ///
643    /// ```rust
644    /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
645    ///
646    /// // imagine an action on columns (a, b, c, d).
647    /// let original = MapFilterProject::new(4).map(vec![
648    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
649    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
650    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
651    /// ]).project(vec![6]);
652    ///
653    /// // Imagine we start with columns (b, x, a, y, c).
654    /// //
655    /// // The `partition` method requires a map from *expected* input columns to *actual*
656    /// // input columns. In the example above, the columns a, b, and c exist, and are at
657    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
658    /// let mut available_columns = std::collections::BTreeMap::new();
659    /// available_columns.insert(0, 2);
660    /// available_columns.insert(1, 0);
661    /// available_columns.insert(2, 4);
662    /// // Partition `original` using the available columns and current input arity.
663    /// // This informs `partition` which columns are available, where they can be found,
664    /// // and how many columns are not relevant but should be preserved.
665    /// let (before, after) = original.partition(available_columns, 5);
666    ///
667    /// // `before` sees all five input columns, and should append `a + b + c`.
668    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
669    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
670    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
671    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
672    ///
673    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
674    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
675    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
676    /// ]).project(vec![5]));
677    ///
678    /// // To reconstruct `self`, we must introduce the columns that are not present,
679    /// // and present them in the order intended by `self`. In this example, we must
680    /// // introduce column d and permute the columns so that they begin (a, b, c, d).
681    /// // The columns x and y must be projected away, and any columns introduced by
682    /// // `begin` must be retained in their current order.
683    ///
684    /// // The `after` instance expects to be provided with all inputs, but it
685    /// // may not need all inputs. The `demand()` and `permute()` methods can
686    /// // optimize the representation.
687    /// ```
688    pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
689        // Map expressions, filter predicates, and projections for `before` and `after`.
690        let mut before_expr = Vec::new();
691        let mut before_pred = Vec::new();
692        let mut before_proj = Vec::new();
693        let mut after_expr = Vec::new();
694        let mut after_pred = Vec::new();
695        let mut after_proj = Vec::new();
696
697        // Track which output columns must be preserved in the output of `before`.
698        let mut demanded = BTreeSet::new();
699        demanded.extend(0..self.input_arity);
700        demanded.extend(self.projection.iter());
701
702        // Determine which map expressions can be computed from the available subset.
703        // Some expressions may depend on other expressions, but by evaluating them
704        // in forward order we should accurately determine the available expressions.
705        let mut available_expr = vec![false; self.input_arity];
706        // Initialize available columns from `available`, which is then not used again.
707        for index in available.keys() {
708            available_expr[*index] = true;
709        }
710        for expr in self.expressions.into_iter() {
711            // We treat an expression as available if its supporting columns are available,
712            // and if it is not a literal (we want to avoid pushing down literals). This
713            // choice is ad-hoc, but the intent is that we partition the operators so
714            // that we can reduce the row representation size and total computation.
715            // Pushing down literals harms the former and does nothing for the latter.
716            // In the future, we'll want to have a harder think about this trade-off, as
717            // we are certainly making sub-optimal decisions by pushing down all available
718            // work.
719            // TODO(mcsherry): establish better principles about what work to push down.
720            let is_available =
721                expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
722            if is_available {
723                before_expr.push(expr);
724            } else {
725                demanded.extend(expr.support());
726                after_expr.push(expr);
727            }
728            available_expr.push(is_available);
729        }
730
731        // Determine which predicates can be computed from the available subset.
732        for (_when, pred) in self.predicates.into_iter() {
733            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
734            if is_available {
735                before_pred.push(pred);
736            } else {
737                demanded.extend(pred.support());
738                after_pred.push(pred);
739            }
740        }
741
742        // Map from prior output location to location in un-projected `before`.
743        // This map is used to correct references in `before` but it should be
744        // adjusted to reflect `before`s projection prior to use in `after`.
745        let mut before_map = available;
746        // Input columns include any additional undescribed columns that may
747        // not be captured by the `available` argument, so we must independently
748        // track the current number of columns (vs relying on `before_map.len()`).
749        let mut input_columns = input_arity;
750        for index in self.input_arity..available_expr.len() {
751            if available_expr[index] {
752                before_map.insert(index, input_columns);
753                input_columns += 1;
754            }
755        }
756
757        // Permute the column references in `before` expressions and predicates.
758        for expr in before_expr.iter_mut() {
759            expr.permute_map(&before_map);
760        }
761        for pred in before_pred.iter_mut() {
762            pred.permute_map(&before_map);
763        }
764
765        // Demand information determines `before`s output projection.
766        // Specifically, we produce all input columns in the output, as well as
767        // any columns that are available and demanded.
768        before_proj.extend(0..input_arity);
769        for index in self.input_arity..available_expr.len() {
770            // If an intermediate result is both available and demanded,
771            // we should produce it as output.
772            if available_expr[index] && demanded.contains(&index) {
773                // Use the new location of `index`.
774                before_proj.push(before_map[&index]);
775            }
776        }
777
778        // Map from prior output locations to location in post-`before` columns.
779        // This map is used to correct references in `after`.
780        // The presumption is that `after` will be presented with all input columns,
781        // followed by the output columns introduced by `before` in order.
782        let mut after_map = BTreeMap::new();
783        for index in 0..self.input_arity {
784            after_map.insert(index, index);
785        }
786        for index in self.input_arity..available_expr.len() {
787            // If an intermediate result is both available and demanded,
788            // it was produced as output.
789            if available_expr[index] && demanded.contains(&index) {
790                // We expect to find the output as far after `self.input_arity` as
791                // it was produced after `input_arity` in the output of `before`.
792                let location = self.input_arity
793                    + (before_proj
794                        .iter()
795                        .position(|x| x == &before_map[&index])
796                        .unwrap()
797                        - input_arity);
798                after_map.insert(index, location);
799            }
800        }
801        // We must now re-map the remaining non-demanded expressions, which are
802        // contiguous rather than potentially interspersed.
803        for index in self.input_arity..available_expr.len() {
804            if !available_expr[index] {
805                after_map.insert(index, after_map.len());
806            }
807        }
808
809        // Permute the column references in `after` expressions and predicates.
810        for expr in after_expr.iter_mut() {
811            expr.permute_map(&after_map);
812        }
813        for pred in after_pred.iter_mut() {
814            pred.permute_map(&after_map);
815        }
816        // Populate `after` projection with the new locations of `self.projection`.
817        for index in self.projection {
818            after_proj.push(after_map[&index]);
819        }
820
821        // Form and return the before and after MapFilterProject instances.
822        let before = Self::new(input_arity)
823            .map(before_expr)
824            .filter(before_pred)
825            .project(before_proj.clone());
826        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
827            .map(after_expr)
828            .filter(after_pred)
829            .project(after_proj);
830        (before, after)
831    }
832
833    /// Lists input columns whose values are used in outputs.
834    ///
835    /// It is entirely appropriate to determine the demand of an instance
836    /// and then both apply a projection to the subject of the instance and
837    /// `self.permute` this instance.
838    pub fn demand(&self) -> BTreeSet<usize> {
839        let mut demanded = BTreeSet::new();
840        for (_index, pred) in self.predicates.iter() {
841            demanded.extend(pred.support());
842        }
843        demanded.extend(self.projection.iter().cloned());
844        for index in (0..self.expressions.len()).rev() {
845            if demanded.contains(&(self.input_arity + index)) {
846                demanded.extend(self.expressions[index].support());
847            }
848        }
849        demanded.retain(|col| col < &self.input_arity);
850        demanded
851    }
852
853    /// Update input column references, due to an input projection or permutation.
854    ///
855    /// The `shuffle` argument remaps expected column identifiers to new locations,
856    /// with the expectation that `shuffle` describes all input columns, and so the
857    /// intermediate results will be able to start at position `shuffle.len()`.
858    ///
859    /// The supplied `shuffle` might not list columns that are not "demanded" by the
860    /// instance, and so we should ensure that `self` is optimized to not reference
861    /// columns that are not demanded.
862    pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
863    where
864        F: Fn(usize) -> usize,
865    {
866        let (mut map, mut filter, mut project) = self.as_map_filter_project();
867        let map_len = map.len();
868        let action = |col: &mut usize| {
869            if self.input_arity <= *col && *col < self.input_arity + map_len {
870                *col = new_input_arity + (*col - self.input_arity);
871            } else {
872                *col = remap(*col);
873            }
874        };
875        for expr in map.iter_mut() {
876            expr.visit_columns(action);
877        }
878        for pred in filter.iter_mut() {
879            pred.visit_columns(action);
880        }
881        for proj in project.iter_mut() {
882            action(proj);
883            assert!(*proj < new_input_arity + map.len());
884        }
885        *self = Self::new(new_input_arity)
886            .map(map)
887            .filter(filter)
888            .project(project)
889    }
890}
891
892// Optimization routines.
893impl MapFilterProject {
894    /// Optimize the internal expression evaluation order.
895    ///
896    /// This method performs several optimizations that are meant to streamline
897    /// the execution of the `MapFilterProject` instance, but not to alter its
898    /// semantics. This includes extracting expressions that are used multiple
899    /// times, inlining those that are not, and removing expressions that are
900    /// unreferenced.
901    ///
902    /// This method will inline all temporal expressions, and remove any columns
903    /// that are not demanded by the output, which should transform any temporal
904    /// filters to a state where the temporal expressions exist only in the list
905    /// of predicates.
906    ///
907    /// # Example
908    ///
909    /// This example demonstrates how the re-use of one expression, converting
910    /// column 1 from a string to an integer, can be extracted and the results
911    /// shared among the two uses. This example is used for each of the steps
912    /// along the optimization path.
913    ///
914    /// ```rust
915    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
916    /// // Demonstrate extraction of common expressions (here: parsing strings).
917    /// let mut map_filter_project = MapFilterProject::new(5)
918    ///     .map(vec![
919    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
920    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
921    ///     ])
922    ///     .project(vec![3,4,5,6]);
923    ///
924    /// let mut expected_optimized = MapFilterProject::new(5)
925    ///     .map(vec![
926    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
927    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
928    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
929    ///     ])
930    ///     .project(vec![3,4,6,7]);
931    ///
932    /// // Optimize the expression.
933    /// map_filter_project.optimize();
934    ///
935    /// assert_eq!(
936    ///     map_filter_project,
937    ///     expected_optimized,
938    /// );
939    /// ```
940    pub fn optimize(&mut self) {
941        // Track sizes and iterate as long as they decrease.
942        let mut prev_size = None;
943        let mut self_size = usize::max_value();
944        // Continue as long as strict improvements occur.
945        while prev_size.map(|p| self_size < p).unwrap_or(true) {
946            // Lock in current size.
947            prev_size = Some(self_size);
948
949            // We have an annoying pattern of mapping literals that already exist as columns (by filters).
950            // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
951            // and then replace the mapped expression by a column reference.
952            //
953            // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
954            // the first place. The tell-tale that we see when we fix is a diff that look likes
955            //
956            // - Project (#0, #2)
957            // -   Filter (#1 = 1)
958            // -     Map (1)
959            // -       Get l0
960            // + Filter (#1 = 1)
961            // +   Get l0
962            //
963            for (index, expr) in self.expressions.iter_mut().enumerate() {
964                // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
965                for (_, predicate) in self.predicates.iter() {
966                    if let MirScalarExpr::CallBinary {
967                        func: crate::BinaryFunc::Eq(_),
968                        expr1,
969                        expr2,
970                    } = predicate
971                    {
972                        if let MirScalarExpr::Column(c, name) = &**expr1 {
973                            if *c < index + self.input_arity && &**expr2 == expr {
974                                *expr = MirScalarExpr::Column(*c, name.clone());
975                            }
976                        }
977                        if let MirScalarExpr::Column(c, name) = &**expr2 {
978                            if *c < index + self.input_arity && &**expr1 == expr {
979                                *expr = MirScalarExpr::Column(*c, name.clone());
980                            }
981                        }
982                    }
983                }
984            }
985
986            // Optimization memoizes individual `ScalarExpr` expressions that
987            // are sure to be evaluated, canonicalizes references to the first
988            // occurrence of each, inlines expressions that have a reference
989            // count of one, and then removes any expressions that are not
990            // referenced.
991            self.memoize_expressions();
992            self.predicates.sort();
993            self.predicates.dedup();
994            self.inline_expressions();
995            self.remove_undemanded();
996
997            // Re-build `self` from parts to restore evaluation order invariants.
998            let (map, filter, project) = self.as_map_filter_project();
999            *self = Self::new(self.input_arity)
1000                .map(map)
1001                .filter(filter)
1002                .project(project);
1003
1004            self_size = self.size();
1005        }
1006    }
1007
1008    /// Total expression sizes across all expressions.
1009    pub fn size(&self) -> usize {
1010        self.expressions.iter().map(|e| e.size()).sum::<usize>()
1011            + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1012    }
1013
1014    /// Place each certainly evaluated expression in its own column.
1015    ///
1016    /// This method places each non-trivial, certainly evaluated expression
1017    /// in its own column, and deduplicates them so that all references to
1018    /// the same expression reference the same column.
1019    ///
1020    /// This transformation is restricted to expressions we are certain will
1021    /// be evaluated, which does not include expressions in `if` statements.
1022    ///
1023    /// # Example
1024    ///
1025    /// This example demonstrates how memoization notices `MirScalarExpr`s
1026    /// that are used multiple times, and ensures that each are extracted
1027    /// into columns and then referenced by column. This pass does not try
1028    /// to minimize the occurrences of column references, which will happen
1029    /// in inlining.
1030    ///
1031    /// ```rust
1032    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1033    /// // Demonstrate extraction of common expressions (here: parsing strings).
1034    /// let mut map_filter_project = MapFilterProject::new(5)
1035    ///     .map(vec![
1036    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1037    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1038    ///     ])
1039    ///     .project(vec![3,4,5,6]);
1040    ///
1041    /// let mut expected_optimized = MapFilterProject::new(5)
1042    ///     .map(vec![
1043    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1044    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1045    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1046    ///         MirScalarExpr::column(7),
1047    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1048    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1049    ///         MirScalarExpr::column(10),
1050    ///     ])
1051    ///     .project(vec![3,4,8,11]);
1052    ///
1053    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1054    /// map_filter_project.memoize_expressions();
1055    ///
1056    /// assert_eq!(
1057    ///     map_filter_project,
1058    ///     expected_optimized,
1059    /// );
1060    /// ```
1061    ///
1062    /// Expressions may not be memoized if they are not certain to be evaluated,
1063    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1064    ///
1065    /// ```rust
1066    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1067    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1068    /// // the non-extraction of common expressions guarded by conditions.
1069    /// let mut map_filter_project = MapFilterProject::new(2)
1070    ///     .map(vec![
1071    ///         MirScalarExpr::If {
1072    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1073    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1074    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1075    ///         },
1076    ///         MirScalarExpr::If {
1077    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1078    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1079    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1080    ///         },
1081    ///     ]);
1082    ///
1083    /// let mut expected_optimized = MapFilterProject::new(2)
1084    ///     .map(vec![
1085    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1086    ///         MirScalarExpr::If {
1087    ///             cond: Box::new(MirScalarExpr::column(2)),
1088    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1089    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1090    ///         },
1091    ///         MirScalarExpr::column(3),
1092    ///         MirScalarExpr::If {
1093    ///             cond: Box::new(MirScalarExpr::column(2)),
1094    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1095    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1096    ///         },
1097    ///         MirScalarExpr::column(5),
1098    ///     ])
1099    ///     .project(vec![0,1,4,6]);
1100    ///
1101    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1102    /// map_filter_project.memoize_expressions();
1103    ///
1104    /// assert_eq!(
1105    ///     map_filter_project,
1106    ///     expected_optimized,
1107    /// );
1108    /// ```
1109    pub fn memoize_expressions(&mut self) {
1110        // Record the mapping from starting column references to new column
1111        // references.
1112        let mut remaps = BTreeMap::new();
1113        for index in 0..self.input_arity {
1114            remaps.insert(index, index);
1115        }
1116        let mut new_expressions = Vec::new();
1117
1118        // We follow the same order as for evaluation, to ensure that all
1119        // column references exist in time for their evaluation. We could
1120        // prioritize predicates, but we would need to be careful to chase
1121        // down column references to expressions and memoize those as well.
1122        let mut expression = 0;
1123        for (support, predicate) in self.predicates.iter_mut() {
1124            while self.input_arity + expression < *support {
1125                self.expressions[expression].permute_map(&remaps);
1126                memoize_expr(
1127                    &mut self.expressions[expression],
1128                    &mut new_expressions,
1129                    self.input_arity,
1130                );
1131                remaps.insert(
1132                    self.input_arity + expression,
1133                    self.input_arity + new_expressions.len(),
1134                );
1135                new_expressions.push(self.expressions[expression].clone());
1136                expression += 1;
1137            }
1138            predicate.permute_map(&remaps);
1139            memoize_expr(predicate, &mut new_expressions, self.input_arity);
1140        }
1141        while expression < self.expressions.len() {
1142            self.expressions[expression].permute_map(&remaps);
1143            memoize_expr(
1144                &mut self.expressions[expression],
1145                &mut new_expressions,
1146                self.input_arity,
1147            );
1148            remaps.insert(
1149                self.input_arity + expression,
1150                self.input_arity + new_expressions.len(),
1151            );
1152            new_expressions.push(self.expressions[expression].clone());
1153            expression += 1;
1154        }
1155
1156        self.expressions = new_expressions;
1157        for proj in self.projection.iter_mut() {
1158            *proj = remaps[proj];
1159        }
1160
1161        // Restore predicate order invariants.
1162        for (pos, pred) in self.predicates.iter_mut() {
1163            *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1164        }
1165    }
1166
1167    /// This method inlines expressions with a single use.
1168    ///
1169    /// This method only inlines expressions; it does not delete expressions
1170    /// that are no longer referenced. The `remove_undemanded()` method does
1171    /// that, and should likely be used after this method.
1172    ///
1173    /// Inlining replaces column references when the referred-to item is either
1174    /// another column reference, or the only referrer of its referent. This
1175    /// is most common after memoization has atomized all expressions to seek
1176    /// out re-use: inlining re-assembles expressions that were not helpfully
1177    /// shared with other expressions.
1178    ///
1179    /// # Example
1180    ///
1181    /// In this example, we see that with only a single reference to columns
1182    /// 0 and 2, their parsing can each be inlined. Similarly, column references
1183    /// can be cleaned up among expressions, and in the final projection.
1184    ///
1185    /// Also notice the remaining expressions, which can be cleaned up in a later
1186    /// pass (the `remove_undemanded` method).
1187    ///
1188    /// ```rust
1189    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1190    /// // Use the output from first `memoize_expression` example.
1191    /// let mut map_filter_project = MapFilterProject::new(5)
1192    ///     .map(vec![
1193    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1194    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1195    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1196    ///         MirScalarExpr::column(7),
1197    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1198    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1199    ///         MirScalarExpr::column(10),
1200    ///     ])
1201    ///     .project(vec![3,4,8,11]);
1202    ///
1203    /// let mut expected_optimized = MapFilterProject::new(5)
1204    ///     .map(vec![
1205    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1206    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1207    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1208    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1209    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1210    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1211    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1212    ///     ])
1213    ///     .project(vec![3,4,8,11]);
1214    ///
1215    /// // Inline expressions that are referenced only once.
1216    /// map_filter_project.inline_expressions();
1217    ///
1218    /// assert_eq!(
1219    ///     map_filter_project,
1220    ///     expected_optimized,
1221    /// );
1222    /// ```
1223    pub fn inline_expressions(&mut self) {
1224        // Local copy of input_arity to avoid borrowing `self` in closures.
1225        let input_arity = self.input_arity;
1226        // Reference counts track the number of places that a reference occurs.
1227        let mut reference_count = vec![0; input_arity + self.expressions.len()];
1228        // Increment reference counts for each use
1229        for expr in self.expressions.iter() {
1230            expr.visit_pre(|e| {
1231                if let MirScalarExpr::Column(i, _name) = e {
1232                    reference_count[*i] += 1;
1233                }
1234            });
1235        }
1236        for (_, pred) in self.predicates.iter() {
1237            pred.visit_pre(|e| {
1238                if let MirScalarExpr::Column(i, _name) = e {
1239                    reference_count[*i] += 1;
1240                }
1241            });
1242        }
1243        for proj in self.projection.iter() {
1244            reference_count[*proj] += 1;
1245        }
1246
1247        // Determine which expressions should be inlined because they reference temporal expressions.
1248        let mut is_temporal = vec![false; input_arity];
1249        for expr in self.expressions.iter() {
1250            // An express may contain a temporal expression, or reference a column containing such.
1251            is_temporal.push(
1252                expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1253            );
1254        }
1255
1256        // Inline only those columns that 1. are expressions not inputs, and
1257        // 2a. are column references or literals or 2b. have a refcount of 1,
1258        // or 2c. reference temporal expressions (which cannot be evaluated).
1259        let mut should_inline = vec![false; reference_count.len()];
1260        for i in (input_arity..reference_count.len()).rev() {
1261            if let MirScalarExpr::Column(c, _) = self.expressions[i - input_arity] {
1262                should_inline[i] = true;
1263                // The reference count of the referenced column should be
1264                // incremented with the number of references
1265                // `self.expressions[i - input_arity]` has.
1266                // Subtract 1 because `self.expressions[i - input_arity]` is
1267                // itself a reference.
1268                reference_count[c] += reference_count[i] - 1;
1269            } else {
1270                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1271            }
1272        }
1273        // Inline expressions per `should_inline`.
1274        self.perform_inlining(should_inline);
1275        // We can only inline column references in `self.projection`, but we should.
1276        for proj in self.projection.iter_mut() {
1277            if *proj >= self.input_arity {
1278                if let MirScalarExpr::Column(i, _) = self.expressions[*proj - self.input_arity] {
1279                    // TODO(mgree) !!! propagate name information to projection
1280                    *proj = i;
1281                }
1282            }
1283        }
1284    }
1285
1286    /// Inlines those expressions that are indicated by should_inline.
1287    /// See `inline_expressions` for usage.
1288    pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1289        for index in 0..self.expressions.len() {
1290            let (prior, expr) = self.expressions.split_at_mut(index);
1291            #[allow(deprecated)]
1292            expr[0].visit_mut_post_nolimit(&mut |e| {
1293                if let MirScalarExpr::Column(i, _name) = e {
1294                    if should_inline[*i] {
1295                        *e = prior[*i - self.input_arity].clone();
1296                    }
1297                }
1298            });
1299        }
1300        for (_index, pred) in self.predicates.iter_mut() {
1301            let expressions = &self.expressions;
1302            #[allow(deprecated)]
1303            pred.visit_mut_post_nolimit(&mut |e| {
1304                if let MirScalarExpr::Column(i, _name) = e {
1305                    if should_inline[*i] {
1306                        *e = expressions[*i - self.input_arity].clone();
1307                    }
1308                }
1309            });
1310        }
1311    }
1312
1313    /// Removes unused expressions from `self.expressions`.
1314    ///
1315    /// Expressions are "used" if they are relied upon by any output columns
1316    /// or any predicates, even transitively. Any expressions that are not
1317    /// relied upon in this way can be discarded.
1318    ///
1319    /// # Example
1320    ///
1321    /// ```rust
1322    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1323    /// // Use the output from `inline_expression` example.
1324    /// let mut map_filter_project = MapFilterProject::new(5)
1325    ///     .map(vec![
1326    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1327    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1328    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1329    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1330    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1331    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1332    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1333    ///     ])
1334    ///     .project(vec![3,4,8,11]);
1335    ///
1336    /// let mut expected_optimized = MapFilterProject::new(5)
1337    ///     .map(vec![
1338    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1339    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1340    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1341    ///     ])
1342    ///     .project(vec![3,4,6,7]);
1343    ///
1344    /// // Remove undemanded expressions, streamlining the work done..
1345    /// map_filter_project.remove_undemanded();
1346    ///
1347    /// assert_eq!(
1348    ///     map_filter_project,
1349    ///     expected_optimized,
1350    /// );
1351    /// ```
1352    pub fn remove_undemanded(&mut self) {
1353        // Determine the demanded expressions to remove irrelevant ones.
1354        let mut demand = BTreeSet::new();
1355        for (_index, pred) in self.predicates.iter() {
1356            demand.extend(pred.support());
1357        }
1358        // Start from the output columns as presumed demanded.
1359        // If this is not the case, the caller should project some away.
1360        demand.extend(self.projection.iter().cloned());
1361        // Proceed in *reverse* order, as expressions may depend on other
1362        // expressions that precede them.
1363        for index in (0..self.expressions.len()).rev() {
1364            if demand.contains(&(self.input_arity + index)) {
1365                demand.extend(self.expressions[index].support());
1366            }
1367        }
1368
1369        // Maintain a map from initial column identifiers to locations
1370        // once we have removed undemanded expressions.
1371        let mut remap = BTreeMap::new();
1372        // This map only needs to map elements of `demand` to a new location,
1373        // but the logic is easier if we include all input columns (as the
1374        // new position is then determined by the size of the map).
1375        for index in 0..self.input_arity {
1376            remap.insert(index, index);
1377        }
1378        // Retain demanded expressions, and record their new locations.
1379        let mut new_expressions = Vec::new();
1380        for (index, expr) in self.expressions.drain(..).enumerate() {
1381            if demand.contains(&(index + self.input_arity)) {
1382                remap.insert(index + self.input_arity, remap.len());
1383                new_expressions.push(expr);
1384            }
1385        }
1386        self.expressions = new_expressions;
1387
1388        // Update column identifiers; rebuild `Self` to re-establish any invariants.
1389        // We mirror `self.permute(&remap)` but we specifically want to remap columns
1390        // that are produced by `self.expressions` after the input columns.
1391        let (expressions, predicates, projection) = self.as_map_filter_project();
1392        *self = Self::new(self.input_arity)
1393            .map(expressions.into_iter().map(|mut e| {
1394                e.permute_map(&remap);
1395                e
1396            }))
1397            .filter(predicates.into_iter().map(|mut p| {
1398                p.permute_map(&remap);
1399                p
1400            }))
1401            .project(projection.into_iter().map(|c| remap[&c]));
1402    }
1403}
1404
1405// TODO: move this elsewhere?
1406/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1407///
1408/// A part of `expr` that is memoized is replaced by a reference to column
1409/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1410/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1411/// refers to.
1412pub fn memoize_expr(
1413    expr: &mut MirScalarExpr,
1414    memoized_parts: &mut Vec<MirScalarExpr>,
1415    input_arity: usize,
1416) {
1417    #[allow(deprecated)]
1418    expr.visit_mut_pre_post_nolimit(
1419        &mut |e| {
1420            // We should not eagerly memoize `if` branches that might not be taken.
1421            // TODO: Memoize expressions in the intersection of `then` and `els`.
1422            if let MirScalarExpr::If { cond, .. } = e {
1423                return Some(vec![cond]);
1424            }
1425
1426            // We should not eagerly memoize `COALESCE` expressions after the first,
1427            // as they are only meant to be evaluated if the preceding expressions
1428            // evaluate to NULL. We could memoize any preceding by expressions that
1429            // are certain not to error.
1430            if let MirScalarExpr::CallVariadic {
1431                func: crate::VariadicFunc::Coalesce,
1432                exprs,
1433            } = e
1434            {
1435                return Some(exprs.iter_mut().take(1).collect());
1436            }
1437
1438            // We should not deconstruct temporal filters, because `MfpPlan::create_from` expects
1439            // those to be in a specific form. However, we _should_ attend to the expression that is
1440            // on the opposite side of mz_now(), because it might be a complex expression in itself,
1441            // and is ok to deconstruct.
1442            if let Some((_func, other_side)) = e.as_mut_temporal_filter().ok() {
1443                return Some(vec![other_side]);
1444            }
1445
1446            None
1447        },
1448        &mut |e| {
1449            match e {
1450                MirScalarExpr::Literal(_, _) => {
1451                    // Literals do not need to be memoized.
1452                }
1453                MirScalarExpr::Column(col, _) => {
1454                    // Column references do not need to be memoized, but may need to be
1455                    // updated if they reference a column reference themselves.
1456                    if *col > input_arity {
1457                        if let MirScalarExpr::Column(col2, _) = memoized_parts[*col - input_arity] {
1458                            // We do _not_ propagate column names, since mis-associating names and column
1459                            // references will be very confusing (and possibly bug-inducing).
1460                            *col = col2;
1461                        }
1462                    }
1463                }
1464                _ => {
1465                    // TODO: OOO (Optimizer Optimization Opportunity):
1466                    // we are quadratic in expression size because of this .iter().position
1467                    if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1468                        // Any complex expression that already exists as a prior column can
1469                        // be replaced by a reference to that column.
1470                        *e = MirScalarExpr::column(input_arity + position);
1471                    } else {
1472                        // A complex expression that does not exist should be memoized, and
1473                        // replaced by a reference to the column.
1474                        memoized_parts.push(std::mem::replace(
1475                            e,
1476                            MirScalarExpr::column(input_arity + memoized_parts.len()),
1477                        ));
1478                    }
1479                }
1480            }
1481        },
1482    )
1483}
1484
1485pub mod util {
1486    use std::collections::BTreeMap;
1487
1488    use crate::MirScalarExpr;
1489
1490    #[allow(dead_code)]
1491    /// A triple of actions that map from rows to (key, val) pairs and back again.
1492    struct KeyValRowMapping {
1493        /// Expressions to apply to a row to produce key datums.
1494        to_key: Vec<MirScalarExpr>,
1495        /// Columns to project from a row to produce residual value datums.
1496        to_val: Vec<usize>,
1497        /// Columns to project from the concatenation of key and value to reconstruct the row.
1498        to_row: Vec<usize>,
1499    }
1500
1501    /// Derive supporting logic to support transforming rows to (key, val) pairs,
1502    /// and back again.
1503    ///
1504    /// We are given as input a list of key expressions and an input arity, and the
1505    /// requirement the produced key should be the application of the key expressions.
1506    /// To produce the `val` output, we will identify those input columns not found in
1507    /// the key expressions, and name all other columns.
1508    /// To reconstitute the original row, we identify the sequence of columns from the
1509    /// concatenation of key and val which would reconstruct the original row.
1510    ///
1511    /// The output is a pair of column sequences, the first used to reconstruct a row
1512    /// from the concatenation of key and value, and the second to identify the columns
1513    /// of a row that should become the value associated with its key.
1514    ///
1515    /// The permutations and thinning expressions generated here will be tracked in
1516    /// `dataflow::plan::AvailableCollections`; see the
1517    /// documentation there for more details.
1518    pub fn permutation_for_arrangement(
1519        key: &[MirScalarExpr],
1520        unthinned_arity: usize,
1521    ) -> (Vec<usize>, Vec<usize>) {
1522        let columns_in_key: BTreeMap<_, _> = key
1523            .iter()
1524            .enumerate()
1525            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1526            .collect();
1527        let mut input_cursor = key.len();
1528        let permutation = (0..unthinned_arity)
1529            .map(|c| {
1530                if let Some(c) = columns_in_key.get(&c) {
1531                    // Column is in key (and thus gone from the value
1532                    // of the thinned representation)
1533                    *c
1534                } else {
1535                    // Column remains in value of the thinned representation
1536                    input_cursor += 1;
1537                    input_cursor - 1
1538                }
1539            })
1540            .collect();
1541        let thinning = (0..unthinned_arity)
1542            .filter(|c| !columns_in_key.contains_key(c))
1543            .collect();
1544        (permutation, thinning)
1545    }
1546
1547    /// Given the permutations (see [`permutation_for_arrangement`] and
1548    /// (`dataflow::plan::AvailableCollections`) corresponding to two
1549    /// collections with the same key arity,
1550    /// computes the permutation for the result of joining them.
1551    pub fn join_permutations(
1552        key_arity: usize,
1553        stream_permutation: Vec<usize>,
1554        thinned_stream_arity: usize,
1555        lookup_permutation: Vec<usize>,
1556    ) -> BTreeMap<usize, usize> {
1557        let stream_arity = stream_permutation.len();
1558        let lookup_arity = lookup_permutation.len();
1559
1560        (0..stream_arity + lookup_arity)
1561            .map(|i| {
1562                let location = if i < stream_arity {
1563                    stream_permutation[i]
1564                } else {
1565                    let location_in_lookup = lookup_permutation[i - stream_arity];
1566                    if location_in_lookup < key_arity {
1567                        location_in_lookup
1568                    } else {
1569                        location_in_lookup + thinned_stream_arity
1570                    }
1571                };
1572                (i, location)
1573            })
1574            .collect()
1575    }
1576}
1577
1578pub mod plan {
1579    use std::iter;
1580
1581    use mz_repr::{Datum, Diff, Row, RowArena};
1582    use serde::{Deserialize, Serialize};
1583
1584    use crate::{BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, UnaryFunc, func};
1585
1586    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1587    #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1588    pub struct SafeMfpPlan {
1589        pub(crate) mfp: MapFilterProject,
1590    }
1591
1592    impl SafeMfpPlan {
1593        /// Remaps references to input columns according to `remap`.
1594        ///
1595        /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1596        pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1597        where
1598            F: Fn(usize) -> usize,
1599        {
1600            self.mfp.permute_fn(remap, new_arity);
1601        }
1602        /// Evaluates the linear operator on a supplied list of datums.
1603        ///
1604        /// The arguments are the initial datums associated with the row,
1605        /// and an appropriately lifetimed arena for temporary allocations
1606        /// needed by scalar evaluation.
1607        ///
1608        /// An `Ok` result will either be `None` if any predicate did not
1609        /// evaluate to `Datum::True`, or the values of the columns listed
1610        /// by `self.projection` if all predicates passed. If an error
1611        /// occurs in the evaluation it is returned as an `Err` variant.
1612        /// As the evaluation exits early with failed predicates, it may
1613        /// miss some errors that would occur later in evaluation.
1614        ///
1615        /// The `row` is not cleared first, but emptied if the function
1616        /// returns `Ok(Some(row)).
1617        #[inline(always)]
1618        pub fn evaluate_into<'a, 'row>(
1619            &'a self,
1620            datums: &mut Vec<Datum<'a>>,
1621            arena: &'a RowArena,
1622            row_buf: &'row mut Row,
1623        ) -> Result<Option<&'row Row>, EvalError> {
1624            let passed_predicates = self.evaluate_inner(datums, arena)?;
1625            if !passed_predicates {
1626                Ok(None)
1627            } else {
1628                row_buf
1629                    .packer()
1630                    .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1631                Ok(Some(row_buf))
1632            }
1633        }
1634
1635        /// A version of `evaluate` which produces an iterator over `Datum`
1636        /// as output.
1637        ///
1638        /// This version can be useful when one wants to capture the resulting
1639        /// datums without packing and then unpacking a row.
1640        #[inline(always)]
1641        pub fn evaluate_iter<'b, 'a: 'b>(
1642            &'a self,
1643            datums: &'b mut Vec<Datum<'a>>,
1644            arena: &'a RowArena,
1645        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1646            let passed_predicates = self.evaluate_inner(datums, arena)?;
1647            if !passed_predicates {
1648                Ok(None)
1649            } else {
1650                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1651            }
1652        }
1653
1654        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1655        ///
1656        /// This does not apply `self.projection`, which is up to the calling method.
1657        pub fn evaluate_inner<'b, 'a: 'b>(
1658            &'a self,
1659            datums: &'b mut Vec<Datum<'a>>,
1660            arena: &'a RowArena,
1661        ) -> Result<bool, EvalError> {
1662            let mut expression = 0;
1663            for (support, predicate) in self.mfp.predicates.iter() {
1664                while self.mfp.input_arity + expression < *support {
1665                    datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1666                    expression += 1;
1667                }
1668                if predicate.eval(&datums[..], arena)? != Datum::True {
1669                    return Ok(false);
1670                }
1671            }
1672            while expression < self.mfp.expressions.len() {
1673                datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1674                expression += 1;
1675            }
1676            Ok(true)
1677        }
1678
1679        /// Returns true if evaluation could introduce an error on non-error inputs.
1680        pub fn could_error(&self) -> bool {
1681            self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1682                || self.mfp.expressions.iter().any(|e| e.could_error())
1683        }
1684
1685        /// Returns true when `Self` is the identity.
1686        pub fn is_identity(&self) -> bool {
1687            self.mfp.is_identity()
1688        }
1689    }
1690
1691    impl std::ops::Deref for SafeMfpPlan {
1692        type Target = MapFilterProject;
1693        fn deref(&self) -> &Self::Target {
1694            &self.mfp
1695        }
1696    }
1697
1698    /// Predicates partitioned into temporal and non-temporal.
1699    ///
1700    /// Temporal predicates require some recognition to determine their
1701    /// structure, and it is best to do that once and re-use the results.
1702    ///
1703    /// There are restrictions on the temporal predicates we currently support.
1704    /// They must directly constrain `MzNow` from below or above,
1705    /// by expressions that do not themselves contain `MzNow`.
1706    /// Conjunctions of such constraints are also ok.
1707    #[derive(Clone, Debug, PartialEq)]
1708    pub struct MfpPlan {
1709        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1710        pub(crate) mfp: SafeMfpPlan,
1711        /// Expressions that when evaluated lower-bound `MzNow`.
1712        pub(crate) lower_bounds: Vec<MirScalarExpr>,
1713        /// Expressions that when evaluated upper-bound `MzNow`.
1714        pub(crate) upper_bounds: Vec<MirScalarExpr>,
1715    }
1716
1717    impl MfpPlan {
1718        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1719        ///
1720        /// The first returned list is of predicates that do not contain `mz_now`.
1721        /// The second and third returned lists contain expressions that, once evaluated, lower
1722        /// and upper bound the validity interval of a record, respectively. These second two
1723        /// lists are populated only by binary expressions of the form
1724        /// ```ignore
1725        /// mz_now cmp_op expr
1726        /// ```
1727        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1728        ///
1729        /// If any unsupported expression is found, for example one that uses `mz_now`
1730        /// in an unsupported position, an error is returned.
1731        pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1732            let mut lower_bounds = Vec::new();
1733            let mut upper_bounds = Vec::new();
1734
1735            let mut temporal = Vec::new();
1736
1737            // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1738            mfp.optimize();
1739
1740            mfp.predicates.retain(|(_position, predicate)| {
1741                if predicate.contains_temporal() {
1742                    temporal.push(predicate.clone());
1743                    false
1744                } else {
1745                    true
1746                }
1747            });
1748
1749            for mut predicate in temporal.into_iter() {
1750                let (func, expr2) = predicate.as_mut_temporal_filter()?;
1751                let expr2 = expr2.clone();
1752
1753                // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1754                match func {
1755                    BinaryFunc::Eq(_) => {
1756                        lower_bounds.push(expr2.clone());
1757                        upper_bounds.push(
1758                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1759                        );
1760                    }
1761                    BinaryFunc::Lt(_) => {
1762                        upper_bounds.push(expr2.clone());
1763                    }
1764                    BinaryFunc::Lte(_) => {
1765                        upper_bounds.push(
1766                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1767                        );
1768                    }
1769                    BinaryFunc::Gt(_) => {
1770                        lower_bounds.push(
1771                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1772                        );
1773                    }
1774                    BinaryFunc::Gte(_) => {
1775                        lower_bounds.push(expr2.clone());
1776                    }
1777                    _ => {
1778                        return Err(format!("Unsupported binary temporal operation: {:?}", func));
1779                    }
1780                }
1781            }
1782
1783            Ok(Self {
1784                mfp: SafeMfpPlan { mfp },
1785                lower_bounds,
1786                upper_bounds,
1787            })
1788        }
1789
1790        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1791        pub fn is_identity(&self) -> bool {
1792            self.mfp.mfp.is_identity()
1793                && self.lower_bounds.is_empty()
1794                && self.upper_bounds.is_empty()
1795        }
1796
1797        /// Returns `self`, and leaves behind an identity operator that acts on its output.
1798        pub fn take(&mut self) -> Self {
1799            let mut identity = Self {
1800                mfp: SafeMfpPlan {
1801                    mfp: MapFilterProject::new(self.mfp.projection.len()),
1802                },
1803                lower_bounds: Default::default(),
1804                upper_bounds: Default::default(),
1805            };
1806            std::mem::swap(self, &mut identity);
1807            identity
1808        }
1809
1810        /// Attempt to convert self into a non-temporal MapFilterProject plan.
1811        ///
1812        /// If that is not possible, the original instance is returned as an error.
1813        #[allow(clippy::result_large_err)]
1814        pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1815            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1816                Ok(self.mfp)
1817            } else {
1818                Err(self)
1819            }
1820        }
1821
1822        /// Returns an iterator over mutable references to all non-temporal
1823        /// scalar expressions in the plan.
1824        ///
1825        /// The order of iteration is unspecified.
1826        pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1827            iter::empty()
1828                .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1829                .chain(&mut self.mfp.mfp.expressions)
1830                .chain(&mut self.lower_bounds)
1831                .chain(&mut self.upper_bounds)
1832        }
1833
1834        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1835        ///
1836        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1837        /// or an evaluation error. If `self contains temporal predicates, the results can be times
1838        /// that are greater than the input `time`, and may contain negated `diff` values.
1839        ///
1840        /// The `row_builder` is not cleared first, but emptied if the function
1841        /// returns an iterator with any `Ok(_)` element.
1842        pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1843            &'a self,
1844            datums: &'b mut Vec<Datum<'a>>,
1845            arena: &'a RowArena,
1846            time: mz_repr::Timestamp,
1847            diff: Diff,
1848            valid_time: V,
1849            row_builder: &mut Row,
1850        ) -> impl Iterator<
1851            Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1852        > + use<E, V> {
1853            match self.mfp.evaluate_inner(datums, arena) {
1854                Err(e) => {
1855                    return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1856                }
1857                Ok(true) => {}
1858                Ok(false) => {
1859                    return None.into_iter().chain(None);
1860                }
1861            }
1862
1863            // Lower and upper bounds.
1864            let mut lower_bound = time;
1865            let mut upper_bound = None;
1866
1867            // Track whether we have seen a null in either bound, as this should
1868            // prevent the record from being produced at any time.
1869            let mut null_eval = false;
1870
1871            // Advance our lower bound to be at least the result of any lower bound
1872            // expressions.
1873            for l in self.lower_bounds.iter() {
1874                match l.eval(datums, arena) {
1875                    Err(e) => {
1876                        return Some(Err((e.into(), time, diff)))
1877                            .into_iter()
1878                            .chain(None.into_iter());
1879                    }
1880                    Ok(Datum::MzTimestamp(d)) => {
1881                        lower_bound = lower_bound.max(d);
1882                    }
1883                    Ok(Datum::Null) => {
1884                        null_eval = true;
1885                    }
1886                    x => {
1887                        panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1888                    }
1889                }
1890            }
1891
1892            // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1893            if !valid_time(&lower_bound) {
1894                return None.into_iter().chain(None);
1895            }
1896
1897            // If there are any upper bounds, determine the minimum upper bound.
1898            for u in self.upper_bounds.iter() {
1899                // We can cease as soon as the lower and upper bounds match,
1900                // as the update will certainly not be produced in that case.
1901                if upper_bound != Some(lower_bound) {
1902                    match u.eval(datums, arena) {
1903                        Err(e) => {
1904                            return Some(Err((e.into(), time, diff)))
1905                                .into_iter()
1906                                .chain(None.into_iter());
1907                        }
1908                        Ok(Datum::MzTimestamp(d)) => {
1909                            if let Some(upper) = upper_bound {
1910                                upper_bound = Some(upper.min(d));
1911                            } else {
1912                                upper_bound = Some(d);
1913                            };
1914                            // Force the upper bound to be at least the lower
1915                            // bound. The `is_some()` test should always be true
1916                            // due to the above block, but maintain it here in
1917                            // case that changes. It's hopefully optimized away.
1918                            if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1919                                upper_bound = Some(lower_bound);
1920                            }
1921                        }
1922                        Ok(Datum::Null) => {
1923                            null_eval = true;
1924                        }
1925                        x => {
1926                            panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1927                        }
1928                    }
1929                }
1930            }
1931
1932            // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1933            if let Some(upper) = &mut upper_bound {
1934                if !valid_time(upper) {
1935                    upper_bound = None;
1936                }
1937            }
1938
1939            // Produce an output only if the upper bound exceeds the lower bound,
1940            // and if we did not encounter a `null` in our evaluation.
1941            if Some(lower_bound) != upper_bound && !null_eval {
1942                row_builder
1943                    .packer()
1944                    .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1945                let upper_opt =
1946                    upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1947                let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1948                lower.into_iter().chain(upper_opt)
1949            } else {
1950                None.into_iter().chain(None)
1951            }
1952        }
1953
1954        /// Returns true if evaluation could introduce an error on non-error inputs.
1955        pub fn could_error(&self) -> bool {
1956            self.mfp.could_error()
1957                || self.lower_bounds.iter().any(|e| e.could_error())
1958                || self.upper_bounds.iter().any(|e| e.could_error())
1959        }
1960
1961        /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1962        ///
1963        /// At the moment, this is only true if it projects away all columns and applies no filters,
1964        /// but it could be extended to plans that produce literals independent of the input.
1965        pub fn ignores_input(&self) -> bool {
1966            self.lower_bounds.is_empty()
1967                && self.upper_bounds.is_empty()
1968                && self.mfp.mfp.projection.is_empty()
1969                && self.mfp.mfp.predicates.is_empty()
1970        }
1971    }
1972}