Skip to main content

mz_expr/
linear.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::scalar::optimizable::OptimizableExpr;
16use crate::visit::Visit;
17use crate::{MirRelationExpr, MirScalarExpr};
18
19/// A compound operator that can be applied row-by-row.
20///
21/// This operator integrates the map, filter, and project operators.
22/// It applies a sequences of map expressions, which are allowed to
23/// refer to previous expressions, interleaved with predicates which
24/// must be satisfied for an output to be produced. If all predicates
25/// evaluate to `Datum::True` the data at the identified columns are
26/// collected and produced as output in a packed `Row`.
27///
28/// This operator is a "builder" and its contents may contain expressions
29/// that are not yet executable. For example, it may contain temporal
30/// expressions in `self.expressions`, even though this is not something
31/// we can directly evaluate. The plan creation methods will defensively
32/// ensure that the right thing happens.
33#[derive(
34    Clone,
35    Debug,
36    Eq,
37    PartialEq,
38    Serialize,
39    Deserialize,
40    Hash,
41    Ord,
42    PartialOrd
43)]
44#[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
45pub struct MapFilterProject<E: OptimizableExpr = MirScalarExpr> {
46    /// A sequence of expressions that should be appended to the row.
47    ///
48    /// Many of these expressions may not be produced in the output,
49    /// and may only be present as common subexpressions.
50    pub expressions: Vec<E>,
51    /// Expressions that must evaluate to `Datum::True` for the output
52    /// row to be produced.
53    ///
54    /// Each entry is prepended with a column identifier indicating
55    /// the column *before* which the predicate should first be applied.
56    /// Most commonly this would be one plus the largest column identifier
57    /// in the predicate's support, but it could be larger to implement
58    /// guarded evaluation of predicates.
59    ///
60    /// This list should be sorted by the first field.
61    pub predicates: Vec<(usize, E)>,
62    /// A sequence of column identifiers whose data form the output row.
63    pub projection: Vec<usize>,
64    /// The expected number of input columns.
65    ///
66    /// This is needed to ensure correct identification of newly formed
67    /// columns in the output.
68    pub input_arity: usize,
69}
70
71impl<E: OptimizableExpr + Display> Display for MapFilterProject<E> {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        writeln!(f, "MapFilterProject(")?;
74        writeln!(f, "  expressions:")?;
75        self.expressions
76            .iter()
77            .enumerate()
78            .try_for_each(|(i, e)| writeln!(f, "    #{} <- {},", i + self.input_arity, e))?;
79        writeln!(f, "  predicates:")?;
80        self.predicates
81            .iter()
82            .try_for_each(|(before, p)| writeln!(f, "    <before: {}> {},", before, p))?;
83        writeln!(f, "  projection: {:?}", self.projection)?;
84        writeln!(f, "  input_arity: {}", self.input_arity)?;
85        writeln!(f, ")")
86    }
87}
88
89impl<E: OptimizableExpr> MapFilterProject<E> {
90    /// Create a no-op operator for an input of a supplied arity.
91    pub fn new(input_arity: usize) -> Self {
92        Self {
93            expressions: Vec::new(),
94            predicates: Vec::new(),
95            projection: (0..input_arity).collect(),
96            input_arity,
97        }
98    }
99
100    /// Given two mfps, return an mfp that applies one
101    /// followed by the other.
102    /// Note that the arguments are in the opposite order
103    /// from how function composition is usually written in mathematics.
104    pub fn compose(before: Self, after: Self) -> Self {
105        let (m, f, p) = after.into_map_filter_project();
106        before.map(m).filter(f).project(p)
107    }
108
109    /// True if the operator describes the identity transformation.
110    pub fn is_identity(&self) -> bool {
111        self.expressions.is_empty()
112            && self.predicates.is_empty()
113            && self.projection.len() == self.input_arity
114            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
115    }
116
117    /// Retain only the indicated columns in the presented order.
118    pub fn project<I>(mut self, columns: I) -> Self
119    where
120        I: IntoIterator<Item = usize> + std::fmt::Debug,
121    {
122        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
123        self
124    }
125
126    /// Retain only rows satisfying these predicates.
127    ///
128    /// This method introduces predicates as eagerly as they can be evaluated,
129    /// which may not be desired for predicates that may cause exceptions.
130    /// If fine manipulation is required, the predicates can be added manually.
131    pub fn filter<I>(mut self, predicates: I) -> Self
132    where
133        I: IntoIterator<Item = E>,
134    {
135        for mut predicate in predicates {
136            // Correct column references.
137            predicate.permute(&self.projection[..]);
138
139            // Validate column references.
140            assert!(
141                predicate
142                    .support()
143                    .into_iter()
144                    .all(|c| c < self.input_arity + self.expressions.len())
145            );
146
147            // Insert predicate as eagerly as it can be evaluated:
148            // just after the largest column in its support is formed.
149            let max_support = predicate
150                .support()
151                .into_iter()
152                .max()
153                .map(|c| c + 1)
154                .unwrap_or(0);
155            self.predicates.push((max_support, predicate))
156        }
157        // Stable sort predicates by position at which they take effect.
158        // We put literal errors at the end as a stop-gap to avoid erroring
159        // before we are able to evaluate any predicates that might prevent it.
160        self.predicates
161            .sort_by_key(|(position, predicate)| (E::is_literal_err(predicate), *position));
162        self
163    }
164
165    /// Append the result of evaluating expressions to each row.
166    pub fn map<I>(mut self, expressions: I) -> Self
167    where
168        I: IntoIterator<Item = E>,
169    {
170        for mut expression in expressions {
171            // Correct column references.
172            expression.permute(&self.projection[..]);
173
174            // Validate column references.
175            assert!(
176                expression
177                    .support()
178                    .into_iter()
179                    .all(|c| c < self.input_arity + self.expressions.len())
180            );
181
182            // Introduce expression and produce as output.
183            self.expressions.push(expression);
184            self.projection
185                .push(self.input_arity + self.expressions.len() - 1);
186        }
187
188        self
189    }
190
191    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
192    pub fn into_map_filter_project(self) -> (Vec<E>, Vec<E>, Vec<usize>) {
193        let predicates = self
194            .predicates
195            .into_iter()
196            .map(|(_pos, predicate)| predicate)
197            .collect();
198        (self.expressions, predicates, self.projection)
199    }
200
201    /// As the arguments to `Map`, `Filter`, and `Project` operators.
202    ///
203    /// In principle, this operator can be implemented as a sequence of
204    /// more elemental operators, likely less efficiently.
205    pub fn as_map_filter_project(&self) -> (Vec<E>, Vec<E>, Vec<usize>) {
206        self.clone().into_map_filter_project()
207    }
208}
209
210// Methods that are specific to MirScalarExpr (use MirRelationExpr or as_literal).
211impl MapFilterProject<MirScalarExpr> {
212    /// Determines if a scalar expression must be equal to a literal datum.
213    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
214        for (_pos, predicate) in self.predicates.iter() {
215            if let MirScalarExpr::CallBinary {
216                func: crate::BinaryFunc::Eq(_),
217                expr1,
218                expr2,
219            } = predicate
220            {
221                if let Some(Ok(datum1)) = expr1.as_literal() {
222                    if &**expr2 == expr {
223                        return Some(datum1);
224                    }
225                }
226                if let Some(Ok(datum2)) = expr2.as_literal() {
227                    if &**expr1 == expr {
228                        return Some(datum2);
229                    }
230                }
231            }
232        }
233        None
234    }
235
236    /// Determines if a sequence of scalar expressions must be equal to a literal row.
237    ///
238    /// This method returns `None` on an empty `exprs`, which might be surprising, but
239    /// seems to line up with its callers' expectations of that being a non-constraint.
240    /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
241    /// if they would rather have a literal empty row.
242    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
243        if exprs.is_empty() {
244            return None;
245        }
246        let mut row = Row::default();
247        let mut packer = row.packer();
248        for expr in exprs {
249            if let Some(literal) = self.literal_constraint(expr) {
250                packer.push(literal);
251            } else {
252                return None;
253            }
254        }
255        Some(row)
256    }
257
258    /// Extracts any MapFilterProject at the root of the expression.
259    ///
260    /// The expression will be modified to extract any maps, filters, and
261    /// projections, which will be returned as `Self`. If there are no maps,
262    /// filters, or projections the method will return an identity operator.
263    ///
264    /// The extracted expressions may contain temporal predicates, and one
265    /// should be careful to apply them blindly.
266    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
267        // TODO: This could become iterative rather than recursive if
268        // we were able to fuse MFP operators from below, rather than
269        // from above.
270        match expr {
271            MirRelationExpr::Map { input, scalars } => {
272                let (mfp, expr) = Self::extract_from_expression(input);
273                (mfp.map(scalars.iter().cloned()), expr)
274            }
275            MirRelationExpr::Filter { input, predicates } => {
276                let (mfp, expr) = Self::extract_from_expression(input);
277                (mfp.filter(predicates.iter().cloned()), expr)
278            }
279            MirRelationExpr::Project { input, outputs } => {
280                let (mfp, expr) = Self::extract_from_expression(input);
281                (mfp.project(outputs.iter().cloned()), expr)
282            }
283            // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
284            // this call to `arity()`.
285            x => (Self::new(x.arity()), x),
286        }
287    }
288
289    /// Extracts an error-free MapFilterProject at the root of the expression.
290    ///
291    /// The expression will be modified to extract maps, filters, and projects
292    /// from the root of the expression, which will be returned as `Self`. The
293    /// extraction will halt if a Map or Filter containing a literal error is
294    /// reached. Otherwise, the method will return an identity operator.
295    ///
296    /// This method is meant to be used during optimization, where it is
297    /// necessary to avoid moving around maps and filters with errors.
298    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
299        match expr {
300            MirRelationExpr::Map { input, scalars }
301                if scalars.iter().all(|s| !s.is_literal_err()) =>
302            {
303                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
304                (mfp.map(scalars.iter().cloned()), expr)
305            }
306            MirRelationExpr::Filter { input, predicates }
307                if predicates.iter().all(|p| !p.is_literal_err()) =>
308            {
309                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
310                (mfp.filter(predicates.iter().cloned()), expr)
311            }
312            MirRelationExpr::Project { input, outputs } => {
313                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
314                (mfp.project(outputs.iter().cloned()), expr)
315            }
316            x => (Self::new(x.arity()), x),
317        }
318    }
319
320    /// Extracts an error-free MapFilterProject at the root of the expression.
321    ///
322    /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
323    /// mutable reference.
324    pub fn extract_non_errors_from_expr_ref_mut(
325        expr: &mut MirRelationExpr,
326    ) -> (Self, &mut MirRelationExpr) {
327        // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
328        // superfluous outer if, which works around a borrow-checker issue:
329        // https://github.com/rust-lang/rust/issues/54663
330        if matches!(
331            expr,
332            MirRelationExpr::Map { input: _, scalars }
333                if scalars.iter().all(|s| !s.is_literal_err())
334        ) || matches!(
335            expr,
336            MirRelationExpr::Filter { input: _, predicates }
337                if predicates.iter().all(|p| !p.is_literal_err())
338        ) || matches!(expr, MirRelationExpr::Project { .. })
339        {
340            match expr {
341                MirRelationExpr::Map { input, scalars }
342                    if scalars.iter().all(|s| !s.is_literal_err()) =>
343                {
344                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
345                    (mfp.map(scalars.iter().cloned()), expr)
346                }
347                MirRelationExpr::Filter { input, predicates }
348                    if predicates.iter().all(|p| !p.is_literal_err()) =>
349                {
350                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
351                    (mfp.filter(predicates.iter().cloned()), expr)
352                }
353                MirRelationExpr::Project { input, outputs } => {
354                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
355                    (mfp.project(outputs.iter().cloned()), expr)
356                }
357                _ => unreachable!(),
358            }
359        } else {
360            (Self::new(expr.arity()), expr)
361        }
362    }
363
364    /// Removes an error-free MapFilterProject from the root of the expression.
365    ///
366    /// The expression will be modified to extract maps, filters, and projects
367    /// from the root of the expression, which will be returned as `Self`. The
368    /// extraction will halt if a Map or Filter containing a literal error is
369    /// reached. Otherwise, the method will return an
370    /// identity operator, and the expression will remain unchanged.
371    ///
372    /// This method is meant to be used during optimization, where it is
373    /// necessary to avoid moving around maps and filters with errors.
374    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
375        match expr {
376            MirRelationExpr::Map { input, scalars }
377                if scalars.iter().all(|s| !s.is_literal_err()) =>
378            {
379                let mfp =
380                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
381                *expr = input.take_dangerous();
382                mfp
383            }
384            MirRelationExpr::Filter { input, predicates }
385                if predicates.iter().all(|p| !p.is_literal_err()) =>
386            {
387                let mfp = Self::extract_non_errors_from_expr_mut(input)
388                    .filter(predicates.iter().cloned());
389                *expr = input.take_dangerous();
390                mfp
391            }
392            MirRelationExpr::Project { input, outputs } => {
393                let mfp =
394                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
395                *expr = input.take_dangerous();
396                mfp
397            }
398            x => Self::new(x.arity()),
399        }
400    }
401}
402
403impl<E: OptimizableExpr> MapFilterProject<E> {
404    /// Returns `true` if any predicate in this MFP contains a temporal expression (`mz_now()`).
405    pub fn has_temporal_predicates(&self) -> bool {
406        self.predicates
407            .iter()
408            .any(|(_, predicate)| OptimizableExpr::contains_temporal(predicate))
409    }
410
411    /// Extracts temporal predicates into their own `Self`.
412    ///
413    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
414    /// though there could be justification for extracting them as well if they are otherwise
415    /// unused.
416    ///
417    /// This separation is valuable when the execution cannot be fused into one operator.
418    pub fn extract_temporal(&mut self) -> Self {
419        // Optimize the expression, as it is only post-optimization that we can be certain
420        // that temporal expressions are restricted to filters. We could relax this in the
421        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
422        // seems to be the best fit at the moment.
423        self.optimize();
424
425        // Assert that we no longer have temporal expressions to evaluate. This should only
426        // occur if the optimization above results with temporal expressions yielded in the
427        // output, which is out of spec for how the type is meant to be used.
428        assert!(
429            !self
430                .expressions
431                .iter()
432                .any(|e| OptimizableExpr::contains_temporal(e))
433        );
434
435        // Extract temporal predicates from `self.predicates`.
436        let mut temporal_predicates = Vec::new();
437        self.predicates.retain(|(_position, predicate)| {
438            if OptimizableExpr::contains_temporal(predicate) {
439                temporal_predicates.push(predicate.clone());
440                false
441            } else {
442                true
443            }
444        });
445
446        // Determine extended input columns used by temporal filters.
447        let mut support = BTreeSet::new();
448        for predicate in temporal_predicates.iter() {
449            support.extend(predicate.support());
450        }
451
452        // Discover the locations of these columns after `self.projection`.
453        let old_projection_len = self.projection.len();
454        let mut new_location = BTreeMap::new();
455        for original in support.iter() {
456            if let Some(position) = self.projection.iter().position(|x| x == original) {
457                new_location.insert(*original, position);
458            } else {
459                new_location.insert(*original, self.projection.len());
460                self.projection.push(*original);
461            }
462        }
463        // Permute references in extracted predicates to their new locations.
464        for predicate in temporal_predicates.iter_mut() {
465            predicate.permute_map(&new_location);
466        }
467
468        // Form a new `Self` containing the temporal predicates to return.
469        Self::new(self.projection.len())
470            .filter(temporal_predicates)
471            .project(0..old_projection_len)
472    }
473
474    /// Extracts common expressions from multiple `Self` into a result `Self`.
475    ///
476    /// The argument `mfps` are mutated so that each are functionaly equivalent to their
477    /// corresponding input, when composed atop the resulting `Self`.
478    ///
479    /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
480    pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
481        match mfps.len() {
482            0 => {
483                panic!("Cannot call method on empty arguments");
484            }
485            1 => {
486                let output_arity = mfps[0].projection.len();
487                std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
488            }
489            _ => {
490                // More generally, we convert each mfp to ANF, at which point we can
491                // repeatedly extract atomic expressions that depend only on input
492                // columns, migrate them to an input mfp, and repeat until no such
493                // expressions exist. At this point, we can also migrate predicates
494                // and then determine and push down projections.
495
496                // Prepare a return `Self`.
497                let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
498
499                // We convert each mfp to ANF, using `memoize_expressions`.
500                for mfp in mfps.iter_mut() {
501                    mfp.memoize_expressions();
502                }
503
504                // We repeatedly extract common expressions, until none remain.
505                let mut done = false;
506                while !done {
507                    // We use references to determine common expressions, and must
508                    // introduce a scope here to drop the borrows before mutation.
509                    let common = {
510                        // The input arity may increase as we iterate, so recapture.
511                        let input_arity = result_mfp.projection.len();
512                        let mut prev: BTreeSet<_> = mfps[0]
513                            .expressions
514                            .iter()
515                            .filter(|e| e.support().last() < Some(&input_arity))
516                            .collect();
517                        let mut next = BTreeSet::default();
518                        for mfp in mfps[1..].iter() {
519                            for expr in mfp.expressions.iter() {
520                                if prev.contains(expr) {
521                                    next.insert(expr);
522                                }
523                            }
524                            std::mem::swap(&mut prev, &mut next);
525                            next.clear();
526                        }
527                        prev.into_iter().cloned().collect::<Vec<_>>()
528                    };
529                    // Without new common expressions, we should terminate the loop.
530                    done = common.is_empty();
531
532                    // Migrate each expression in `common` to `result_mfp`.
533                    for expr in common.into_iter() {
534                        // Update each mfp by removing expr and updating column references.
535                        for mfp in mfps.iter_mut() {
536                            // With `expr` next in `result_mfp`, it is as if we are rotating it to
537                            // be the first expression in `mfp`, and then removing it from `mfp` and
538                            // increasing the input arity of `mfp`.
539                            let arity = result_mfp.projection.len();
540                            let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
541                            let index = arity + found;
542                            // Column references change due to the rotation from `index` to `arity`.
543                            let action = |c: &mut usize| {
544                                if arity <= *c && *c < index {
545                                    *c += 1;
546                                } else if *c == index {
547                                    *c = arity;
548                                }
549                            };
550                            // Rotate `expr` from `found` to first, and then snip.
551                            // Short circuit by simply removing and incrementing the input arity.
552                            mfp.input_arity += 1;
553                            mfp.expressions.remove(found);
554                            // Update column references in expressions, predicates, and projections.
555                            for e in mfp.expressions.iter_mut() {
556                                e.visit_columns(action);
557                            }
558                            for (o, e) in mfp.predicates.iter_mut() {
559                                e.visit_columns(action);
560                                // Max out the offset for the predicate; optimization will correct.
561                                *o = mfp.input_arity + mfp.expressions.len();
562                            }
563                            for c in mfp.projection.iter_mut() {
564                                action(c);
565                            }
566                        }
567                        // Install the expression and update
568                        result_mfp.expressions.push(expr);
569                        result_mfp.projection.push(result_mfp.projection.len());
570                    }
571                }
572                // As before, but easier: predicates in common to all mfps.
573                let common_preds: Vec<E> = {
574                    let input_arity = result_mfp.projection.len();
575                    let mut prev: BTreeSet<_> = mfps[0]
576                        .predicates
577                        .iter()
578                        .map(|(_, e)| e)
579                        .filter(|e| e.support().last() < Some(&input_arity))
580                        .collect();
581                    let mut next = BTreeSet::default();
582                    for mfp in mfps[1..].iter() {
583                        for (_, expr) in mfp.predicates.iter() {
584                            if prev.contains(expr) {
585                                next.insert(expr);
586                            }
587                        }
588                        std::mem::swap(&mut prev, &mut next);
589                        next.clear();
590                    }
591                    // Expressions in common, that we will append to `result_mfp.expressions`.
592                    prev.into_iter().cloned().collect::<Vec<_>>()
593                };
594                for mfp in mfps.iter_mut() {
595                    mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
596                    mfp.optimize();
597                }
598                result_mfp.predicates.extend(
599                    common_preds
600                        .into_iter()
601                        .map(|e| (result_mfp.projection.len(), e)),
602                );
603
604                // Then, look for unused columns and project them away.
605                let mut common_demand = BTreeSet::new();
606                for mfp in mfps.iter() {
607                    common_demand.extend(mfp.demand());
608                }
609                // columns in `common_demand` must be retained, but others
610                // may be discarded.
611                let common_demand = (0..result_mfp.projection.len())
612                    .filter(|x| common_demand.contains(x))
613                    .collect::<Vec<_>>();
614                let remap = common_demand
615                    .iter()
616                    .cloned()
617                    .enumerate()
618                    .map(|(new, old)| (old, new))
619                    .collect::<BTreeMap<_, _>>();
620                for mfp in mfps.iter_mut() {
621                    mfp.permute_fn(|c| remap[&c], common_demand.len());
622                }
623                result_mfp = result_mfp.project(common_demand);
624
625                // Return the resulting MFP.
626                result_mfp.optimize();
627                result_mfp
628            }
629        }
630    }
631
632    /// Returns `self`, and leaves behind an identity operator that acts on its output.
633    pub fn take(&mut self) -> Self {
634        let mut identity = Self::new(self.projection.len());
635        std::mem::swap(self, &mut identity);
636        identity
637    }
638
639    /// Convert the `MapFilterProject` into a staged evaluation plan.
640    ///
641    /// The main behavior is extract temporal predicates, which cannot be evaluated
642    /// using the standard machinery.
643    pub fn into_plan(self) -> Result<plan::MfpPlan<E>, String> {
644        plan::MfpPlan::create_from(self)
645    }
646}
647
648impl<E: OptimizableExpr> MapFilterProject<E> {
649    /// Partitions `self` into two instances, one of which can be eagerly applied.
650    ///
651    /// The `available` argument indicates which input columns are available (keys)
652    /// and in which positions (values). This information may allow some maps and
653    /// filters to execute. The `input_arity` argument reports the total number of
654    /// input columns (which may include some not present in `available`)
655    ///
656    /// This method partitions `self` in two parts, `(before, after)`, where `before`
657    /// can be applied on columns present as keys in `available`, and `after` must
658    /// await the introduction of the other input columns.
659    ///
660    /// The `before` instance will *append* any columns that can be determined from
661    /// `available` but will project away any of these columns that are not needed by
662    /// `after`. Importantly, this means that `before` will leave intact *all* input
663    /// columns including those not referenced in `available`.
664    ///
665    /// The `after` instance will presume all input columns are available, followed
666    /// by the appended columns of the `before` instance. It may be that some input
667    /// columns can be projected away in `before` if `after` does not need them, but
668    /// we leave that as something the caller can apply if needed (it is otherwise
669    /// complicated to negotiate which input columns `before` should retain).
670    ///
671    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
672    /// additional input columns, permute all input columns to their locations as
673    /// expected by `self`, follow this by new columns appended by `before`, and
674    /// remove all other columns that may be present.
675    ///
676    /// # Example
677    ///
678    /// ```rust
679    /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
680    ///
681    /// // imagine an action on columns (a, b, c, d).
682    /// let original = MapFilterProject::new(4).map(vec![
683    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
684    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
685    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
686    /// ]).project(vec![6]);
687    ///
688    /// // Imagine we start with columns (b, x, a, y, c).
689    /// //
690    /// // The `partition` method requires a map from *expected* input columns to *actual*
691    /// // input columns. In the example above, the columns a, b, and c exist, and are at
692    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
693    /// let mut available_columns = std::collections::BTreeMap::new();
694    /// available_columns.insert(0, 2);
695    /// available_columns.insert(1, 0);
696    /// available_columns.insert(2, 4);
697    /// // Partition `original` using the available columns and current input arity.
698    /// // This informs `partition` which columns are available, where they can be found,
699    /// // and how many columns are not relevant but should be preserved.
700    /// let (before, after) = original.partition(available_columns, 5);
701    ///
702    /// // `before` sees all five input columns, and should append `a + b + c`.
703    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
704    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
705    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
706    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
707    ///
708    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
709    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
710    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
711    /// ]).project(vec![5]));
712    ///
713    /// // To reconstruct `self`, we must introduce the columns that are not present,
714    /// // and present them in the order intended by `self`. In this example, we must
715    /// // introduce column d and permute the columns so that they begin (a, b, c, d).
716    /// // The columns x and y must be projected away, and any columns introduced by
717    /// // `begin` must be retained in their current order.
718    ///
719    /// // The `after` instance expects to be provided with all inputs, but it
720    /// // may not need all inputs. The `demand()` and `permute()` methods can
721    /// // optimize the representation.
722    /// ```
723    pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
724        // Map expressions, filter predicates, and projections for `before` and `after`.
725        let mut before_expr = Vec::new();
726        let mut before_pred = Vec::new();
727        let mut before_proj = Vec::new();
728        let mut after_expr = Vec::new();
729        let mut after_pred = Vec::new();
730        let mut after_proj = Vec::new();
731
732        // Track which output columns must be preserved in the output of `before`.
733        let mut demanded = BTreeSet::new();
734        demanded.extend(0..self.input_arity);
735        demanded.extend(self.projection.iter());
736
737        // Determine which map expressions can be computed from the available subset.
738        // Some expressions may depend on other expressions, but by evaluating them
739        // in forward order we should accurately determine the available expressions.
740        let mut available_expr = vec![false; self.input_arity];
741        // Initialize available columns from `available`, which is then not used again.
742        for index in available.keys() {
743            available_expr[*index] = true;
744        }
745        for expr in self.expressions.into_iter() {
746            // We treat an expression as available if its supporting columns are available,
747            // and if it is not a literal (we want to avoid pushing down literals). This
748            // choice is ad-hoc, but the intent is that we partition the operators so
749            // that we can reduce the row representation size and total computation.
750            // Pushing down literals harms the former and does nothing for the latter.
751            // In the future, we'll want to have a harder think about this trade-off, as
752            // we are certainly making sub-optimal decisions by pushing down all available
753            // work.
754            // TODO(mcsherry): establish better principles about what work to push down.
755            let is_available = expr.support().into_iter().all(|i| available_expr[i])
756                && !OptimizableExpr::is_literal(&expr);
757            if is_available {
758                before_expr.push(expr);
759            } else {
760                demanded.extend(expr.support());
761                after_expr.push(expr);
762            }
763            available_expr.push(is_available);
764        }
765
766        // Determine which predicates can be computed from the available subset.
767        for (_when, pred) in self.predicates.into_iter() {
768            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
769            if is_available {
770                before_pred.push(pred);
771            } else {
772                demanded.extend(pred.support());
773                after_pred.push(pred);
774            }
775        }
776
777        // Map from prior output location to location in un-projected `before`.
778        // This map is used to correct references in `before` but it should be
779        // adjusted to reflect `before`s projection prior to use in `after`.
780        let mut before_map = available;
781        // Input columns include any additional undescribed columns that may
782        // not be captured by the `available` argument, so we must independently
783        // track the current number of columns (vs relying on `before_map.len()`).
784        let mut input_columns = input_arity;
785        for index in self.input_arity..available_expr.len() {
786            if available_expr[index] {
787                before_map.insert(index, input_columns);
788                input_columns += 1;
789            }
790        }
791
792        // Permute the column references in `before` expressions and predicates.
793        for expr in before_expr.iter_mut() {
794            expr.permute_map(&before_map);
795        }
796        for pred in before_pred.iter_mut() {
797            pred.permute_map(&before_map);
798        }
799
800        // Demand information determines `before`s output projection.
801        // Specifically, we produce all input columns in the output, as well as
802        // any columns that are available and demanded.
803        before_proj.extend(0..input_arity);
804        for index in self.input_arity..available_expr.len() {
805            // If an intermediate result is both available and demanded,
806            // we should produce it as output.
807            if available_expr[index] && demanded.contains(&index) {
808                // Use the new location of `index`.
809                before_proj.push(before_map[&index]);
810            }
811        }
812
813        // Map from prior output locations to location in post-`before` columns.
814        // This map is used to correct references in `after`.
815        // The presumption is that `after` will be presented with all input columns,
816        // followed by the output columns introduced by `before` in order.
817        let mut after_map = BTreeMap::new();
818        for index in 0..self.input_arity {
819            after_map.insert(index, index);
820        }
821        for index in self.input_arity..available_expr.len() {
822            // If an intermediate result is both available and demanded,
823            // it was produced as output.
824            if available_expr[index] && demanded.contains(&index) {
825                // We expect to find the output as far after `self.input_arity` as
826                // it was produced after `input_arity` in the output of `before`.
827                let location = self.input_arity
828                    + (before_proj
829                        .iter()
830                        .position(|x| x == &before_map[&index])
831                        .unwrap()
832                        - input_arity);
833                after_map.insert(index, location);
834            }
835        }
836        // We must now re-map the remaining non-demanded expressions, which are
837        // contiguous rather than potentially interspersed.
838        for index in self.input_arity..available_expr.len() {
839            if !available_expr[index] {
840                after_map.insert(index, after_map.len());
841            }
842        }
843
844        // Permute the column references in `after` expressions and predicates.
845        for expr in after_expr.iter_mut() {
846            expr.permute_map(&after_map);
847        }
848        for pred in after_pred.iter_mut() {
849            pred.permute_map(&after_map);
850        }
851        // Populate `after` projection with the new locations of `self.projection`.
852        for index in self.projection {
853            after_proj.push(after_map[&index]);
854        }
855
856        // Form and return the before and after MapFilterProject instances.
857        let before = Self::new(input_arity)
858            .map(before_expr)
859            .filter(before_pred)
860            .project(before_proj.clone());
861        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
862            .map(after_expr)
863            .filter(after_pred)
864            .project(after_proj);
865        (before, after)
866    }
867
868    /// Lists input columns whose values are used in outputs.
869    ///
870    /// You can use `BTreeSet::last()` to extract the maximum demanded column from the set.
871    ///
872    /// It is entirely appropriate to determine the demand of an instance
873    /// and then both apply a projection to the subject of the instance and
874    /// `self.permute` this instance.
875    pub fn demand(&self) -> BTreeSet<usize> {
876        let mut demanded = BTreeSet::new();
877        for (_index, pred) in self.predicates.iter() {
878            demanded.extend(pred.support());
879        }
880        demanded.extend(self.projection.iter().cloned());
881        for index in (0..self.expressions.len()).rev() {
882            if demanded.contains(&(self.input_arity + index)) {
883                demanded.extend(self.expressions[index].support());
884            }
885        }
886        demanded.retain(|col| col < &self.input_arity);
887        demanded
888    }
889
890    /// Update input column references, due to an input projection or permutation.
891    ///
892    /// The `shuffle` argument remaps expected column identifiers to new locations,
893    /// with the expectation that `shuffle` describes all input columns, and so the
894    /// intermediate results will be able to start at position `shuffle.len()`.
895    ///
896    /// The supplied `shuffle` might not list columns that are not "demanded" by the
897    /// instance, and so we should ensure that `self` is optimized to not reference
898    /// columns that are not demanded.
899    pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
900    where
901        F: Fn(usize) -> usize,
902    {
903        let (mut map, mut filter, mut project) = self.as_map_filter_project();
904        let map_len = map.len();
905        let action = |col: &mut usize| {
906            if self.input_arity <= *col && *col < self.input_arity + map_len {
907                *col = new_input_arity + (*col - self.input_arity);
908            } else {
909                *col = remap(*col);
910            }
911        };
912        for expr in map.iter_mut() {
913            expr.visit_columns(action);
914        }
915        for pred in filter.iter_mut() {
916            pred.visit_columns(action);
917        }
918        for proj in project.iter_mut() {
919            action(proj);
920            assert!(*proj < new_input_arity + map.len());
921        }
922        *self = Self::new(new_input_arity)
923            .map(map)
924            .filter(filter)
925            .project(project)
926    }
927}
928
929// Optimization routines.
930impl<E: OptimizableExpr> MapFilterProject<E> {
931    /// Optimize the internal expression evaluation order.
932    ///
933    /// This method performs several optimizations that are meant to streamline
934    /// the execution of the `MapFilterProject` instance, but not to alter its
935    /// semantics. This includes extracting expressions that are used multiple
936    /// times, inlining those that are not, and removing expressions that are
937    /// unreferenced.
938    ///
939    /// This method will inline all temporal expressions, and remove any columns
940    /// that are not demanded by the output, which should transform any temporal
941    /// filters to a state where the temporal expressions exist only in the list
942    /// of predicates.
943    ///
944    /// # Example
945    ///
946    /// This example demonstrates how the re-use of one expression, converting
947    /// column 1 from a string to an integer, can be extracted and the results
948    /// shared among the two uses. This example is used for each of the steps
949    /// along the optimization path.
950    ///
951    /// ```rust
952    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
953    /// // Demonstrate extraction of common expressions (here: parsing strings).
954    /// let mut map_filter_project = MapFilterProject::new(5)
955    ///     .map(vec![
956    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
957    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
958    ///     ])
959    ///     .project(vec![3,4,5,6]);
960    ///
961    /// let mut expected_optimized = MapFilterProject::new(5)
962    ///     .map(vec![
963    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
964    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
965    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
966    ///     ])
967    ///     .project(vec![3,4,6,7]);
968    ///
969    /// // Optimize the expression.
970    /// map_filter_project.optimize();
971    ///
972    /// assert_eq!(
973    ///     map_filter_project,
974    ///     expected_optimized,
975    /// );
976    /// ```
977    pub fn optimize(&mut self) {
978        // Track sizes and iterate as long as they decrease.
979        let mut prev_size = None;
980        let mut self_size = usize::max_value();
981        // Continue as long as strict improvements occur.
982        while prev_size.map(|p| self_size < p).unwrap_or(true) {
983            // Lock in current size.
984            prev_size = Some(self_size);
985
986            // We have an annoying pattern of mapping literals that already exist as columns (by filters).
987            // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
988            // and then replace the mapped expression by a column reference.
989            //
990            // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
991            // the first place. The tell-tale that we see when we fix is a diff that look likes
992            //
993            // - Project (#0, #2)
994            // -   Filter (#1 = 1)
995            // -     Map (1)
996            // -       Get l0
997            // + Filter (#1 = 1)
998            // +   Get l0
999            //
1000            for (index, expr) in self.expressions.iter_mut().enumerate() {
1001                // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
1002                for (_, predicate) in self.predicates.iter() {
1003                    if let Some(col) =
1004                        E::equality_column_alias(predicate, expr, index + self.input_arity)
1005                    {
1006                        *expr = col;
1007                    }
1008                }
1009            }
1010
1011            // Optimization memoizes individual `ScalarExpr` expressions that
1012            // are sure to be evaluated, canonicalizes references to the first
1013            // occurrence of each, inlines expressions that have a reference
1014            // count of one, and then removes any expressions that are not
1015            // referenced.
1016            self.memoize_expressions();
1017            self.predicates.sort();
1018            self.predicates.dedup();
1019            self.inline_expressions();
1020            self.remove_undemanded();
1021
1022            // Re-build `self` from parts to restore evaluation order invariants.
1023            let (map, filter, project) = self.as_map_filter_project();
1024            *self = Self::new(self.input_arity)
1025                .map(map)
1026                .filter(filter)
1027                .project(project);
1028
1029            self_size = self.size();
1030        }
1031    }
1032
1033    /// Total expression sizes across all expressions.
1034    pub fn size(&self) -> usize {
1035        self.expressions
1036            .iter()
1037            .map(|e| OptimizableExpr::size(e))
1038            .sum::<usize>()
1039            + self
1040                .predicates
1041                .iter()
1042                .map(|(_, e)| OptimizableExpr::size(e))
1043                .sum::<usize>()
1044    }
1045
1046    /// Place each certainly evaluated expression in its own column.
1047    ///
1048    /// This method places each non-trivial, certainly evaluated expression
1049    /// in its own column, and deduplicates them so that all references to
1050    /// the same expression reference the same column.
1051    ///
1052    /// This transformation is restricted to expressions we are certain will
1053    /// be evaluated, which does not include expressions in `if` statements.
1054    ///
1055    /// # Example
1056    ///
1057    /// This example demonstrates how memoization notices `MirScalarExpr`s
1058    /// that are used multiple times, and ensures that each are extracted
1059    /// into columns and then referenced by column. This pass does not try
1060    /// to minimize the occurrences of column references, which will happen
1061    /// in inlining.
1062    ///
1063    /// ```rust
1064    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1065    /// // Demonstrate extraction of common expressions (here: parsing strings).
1066    /// let mut map_filter_project = MapFilterProject::new(5)
1067    ///     .map(vec![
1068    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1069    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1070    ///     ])
1071    ///     .project(vec![3,4,5,6]);
1072    ///
1073    /// let mut expected_optimized = MapFilterProject::new(5)
1074    ///     .map(vec![
1075    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1076    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1077    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1078    ///         MirScalarExpr::column(7),
1079    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1080    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1081    ///         MirScalarExpr::column(10),
1082    ///     ])
1083    ///     .project(vec![3,4,8,11]);
1084    ///
1085    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1086    /// map_filter_project.memoize_expressions();
1087    ///
1088    /// assert_eq!(
1089    ///     map_filter_project,
1090    ///     expected_optimized,
1091    /// );
1092    /// ```
1093    ///
1094    /// Expressions may not be memoized if they are not certain to be evaluated,
1095    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1096    ///
1097    /// ```rust
1098    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1099    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1100    /// // the non-extraction of common expressions guarded by conditions.
1101    /// let mut map_filter_project = MapFilterProject::new(2)
1102    ///     .map(vec![
1103    ///         MirScalarExpr::If {
1104    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1105    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1106    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1107    ///         },
1108    ///         MirScalarExpr::If {
1109    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1110    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1111    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1112    ///         },
1113    ///     ]);
1114    ///
1115    /// let mut expected_optimized = MapFilterProject::new(2)
1116    ///     .map(vec![
1117    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1118    ///         MirScalarExpr::If {
1119    ///             cond: Box::new(MirScalarExpr::column(2)),
1120    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1121    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1122    ///         },
1123    ///         MirScalarExpr::column(3),
1124    ///         MirScalarExpr::If {
1125    ///             cond: Box::new(MirScalarExpr::column(2)),
1126    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1127    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1128    ///         },
1129    ///         MirScalarExpr::column(5),
1130    ///     ])
1131    ///     .project(vec![0,1,4,6]);
1132    ///
1133    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1134    /// map_filter_project.memoize_expressions();
1135    ///
1136    /// assert_eq!(
1137    ///     map_filter_project,
1138    ///     expected_optimized,
1139    /// );
1140    /// ```
1141    pub fn memoize_expressions(&mut self) {
1142        // Record the mapping from starting column references to new column
1143        // references.
1144        let mut remaps = BTreeMap::new();
1145        for index in 0..self.input_arity {
1146            remaps.insert(index, index);
1147        }
1148        let mut new_expressions = Vec::new();
1149
1150        // We follow the same order as for evaluation, to ensure that all
1151        // column references exist in time for their evaluation. We could
1152        // prioritize predicates, but we would need to be careful to chase
1153        // down column references to expressions and memoize those as well.
1154        let mut expression = 0;
1155        for (support, predicate) in self.predicates.iter_mut() {
1156            while self.input_arity + expression < *support {
1157                self.expressions[expression].permute_map(&remaps);
1158                memoize_expr(
1159                    &mut self.expressions[expression],
1160                    &mut new_expressions,
1161                    self.input_arity,
1162                );
1163                remaps.insert(
1164                    self.input_arity + expression,
1165                    self.input_arity + new_expressions.len(),
1166                );
1167                new_expressions.push(self.expressions[expression].clone());
1168                expression += 1;
1169            }
1170            predicate.permute_map(&remaps);
1171            memoize_expr(predicate, &mut new_expressions, self.input_arity);
1172        }
1173        while expression < self.expressions.len() {
1174            self.expressions[expression].permute_map(&remaps);
1175            memoize_expr(
1176                &mut self.expressions[expression],
1177                &mut new_expressions,
1178                self.input_arity,
1179            );
1180            remaps.insert(
1181                self.input_arity + expression,
1182                self.input_arity + new_expressions.len(),
1183            );
1184            new_expressions.push(self.expressions[expression].clone());
1185            expression += 1;
1186        }
1187
1188        self.expressions = new_expressions;
1189        for proj in self.projection.iter_mut() {
1190            *proj = remaps[proj];
1191        }
1192
1193        // Restore predicate order invariants.
1194        for (pos, pred) in self.predicates.iter_mut() {
1195            *pos = pred.support().last().map(|x| *x + 1).unwrap_or(0);
1196        }
1197    }
1198
1199    /// This method inlines expressions with a single use.
1200    ///
1201    /// This method only inlines expressions; it does not delete expressions
1202    /// that are no longer referenced. The `remove_undemanded()` method does
1203    /// that, and should likely be used after this method.
1204    ///
1205    /// Inlining replaces column references when the referred-to item is either
1206    /// another column reference, or the only referrer of its referent. This
1207    /// is most common after memoization has atomized all expressions to seek
1208    /// out re-use: inlining re-assembles expressions that were not helpfully
1209    /// shared with other expressions.
1210    ///
1211    /// # Example
1212    ///
1213    /// In this example, we see that with only a single reference to columns
1214    /// 0 and 2, their parsing can each be inlined. Similarly, column references
1215    /// can be cleaned up among expressions, and in the final projection.
1216    ///
1217    /// Also notice the remaining expressions, which can be cleaned up in a later
1218    /// pass (the `remove_undemanded` method).
1219    ///
1220    /// ```rust
1221    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1222    /// // Use the output from first `memoize_expression` example.
1223    /// let mut map_filter_project = MapFilterProject::new(5)
1224    ///     .map(vec![
1225    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1226    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1227    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1228    ///         MirScalarExpr::column(7),
1229    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1230    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1231    ///         MirScalarExpr::column(10),
1232    ///     ])
1233    ///     .project(vec![3,4,8,11]);
1234    ///
1235    /// let mut expected_optimized = MapFilterProject::new(5)
1236    ///     .map(vec![
1237    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1238    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1239    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1240    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1241    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1242    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1243    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1244    ///     ])
1245    ///     .project(vec![3,4,8,11]);
1246    ///
1247    /// // Inline expressions that are referenced only once.
1248    /// map_filter_project.inline_expressions();
1249    ///
1250    /// assert_eq!(
1251    ///     map_filter_project,
1252    ///     expected_optimized,
1253    /// );
1254    /// ```
1255    pub fn inline_expressions(&mut self) {
1256        // Local copy of input_arity to avoid borrowing `self` in closures.
1257        let input_arity = self.input_arity;
1258        // Reference counts track the number of places that a reference occurs.
1259        let mut reference_count = vec![0; input_arity + self.expressions.len()];
1260        // Increment reference counts for each use
1261        for expr in self.expressions.iter() {
1262            expr.visit_pre(&mut |e| {
1263                if let Some(i) = e.as_column() {
1264                    reference_count[i] += 1;
1265                }
1266            });
1267        }
1268        for (_, pred) in self.predicates.iter() {
1269            pred.visit_pre(&mut |e| {
1270                if let Some(i) = e.as_column() {
1271                    reference_count[i] += 1;
1272                }
1273            });
1274        }
1275        for proj in self.projection.iter() {
1276            reference_count[*proj] += 1;
1277        }
1278
1279        // Determine which expressions should be inlined because they reference temporal expressions.
1280        let mut is_temporal = vec![false; input_arity];
1281        for expr in self.expressions.iter() {
1282            // An express may contain a temporal expression, or reference a column containing such.
1283            is_temporal.push(
1284                OptimizableExpr::contains_temporal(expr)
1285                    || expr.support().into_iter().any(|col| is_temporal[col]),
1286            );
1287        }
1288
1289        // Inline only those columns that 1. are expressions not inputs, and
1290        // 2a. are column references or literals or 2b. have a refcount of 1,
1291        // or 2c. reference temporal expressions (which cannot be evaluated).
1292        let mut should_inline = vec![false; reference_count.len()];
1293        for i in (input_arity..reference_count.len()).rev() {
1294            if let Some(c) = self.expressions[i - input_arity].as_column() {
1295                should_inline[i] = true;
1296                // The reference count of the referenced column should be
1297                // incremented with the number of references
1298                // `self.expressions[i - input_arity]` has.
1299                // Subtract 1 because `self.expressions[i - input_arity]` is
1300                // itself a reference.
1301                reference_count[c] += reference_count[i] - 1;
1302            } else {
1303                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1304            }
1305        }
1306        // Inline expressions per `should_inline`.
1307        self.perform_inlining(should_inline);
1308        // We can only inline column references in `self.projection`, but we should.
1309        for proj in self.projection.iter_mut() {
1310            if *proj >= self.input_arity {
1311                if let Some(i) = self.expressions[*proj - self.input_arity].as_column() {
1312                    // TODO(mgree) !!! propagate name information to projection
1313                    *proj = i;
1314                }
1315            }
1316        }
1317    }
1318
1319    /// Inlines those expressions that are indicated by should_inline.
1320    /// See `inline_expressions` for usage.
1321    pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1322        for index in 0..self.expressions.len() {
1323            let (prior, expr) = self.expressions.split_at_mut(index);
1324            expr[0].visit_mut_post(&mut |e| {
1325                if let Some(i) = e.as_column() {
1326                    if should_inline[i] {
1327                        *e = prior[i - self.input_arity].clone();
1328                    }
1329                }
1330            });
1331        }
1332        for (_index, pred) in self.predicates.iter_mut() {
1333            let expressions = &self.expressions;
1334            pred.visit_mut_post(&mut |e| {
1335                if let Some(i) = e.as_column() {
1336                    if should_inline[i] {
1337                        *e = expressions[i - self.input_arity].clone();
1338                    }
1339                }
1340            });
1341        }
1342    }
1343
1344    /// Removes unused expressions from `self.expressions`.
1345    ///
1346    /// Expressions are "used" if they are relied upon by any output columns
1347    /// or any predicates, even transitively. Any expressions that are not
1348    /// relied upon in this way can be discarded.
1349    ///
1350    /// # Example
1351    ///
1352    /// ```rust
1353    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1354    /// // Use the output from `inline_expression` example.
1355    /// let mut map_filter_project = MapFilterProject::new(5)
1356    ///     .map(vec![
1357    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1358    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1359    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1360    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1361    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1362    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1363    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1364    ///     ])
1365    ///     .project(vec![3,4,8,11]);
1366    ///
1367    /// let mut expected_optimized = MapFilterProject::new(5)
1368    ///     .map(vec![
1369    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1370    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1371    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1372    ///     ])
1373    ///     .project(vec![3,4,6,7]);
1374    ///
1375    /// // Remove undemanded expressions, streamlining the work done..
1376    /// map_filter_project.remove_undemanded();
1377    ///
1378    /// assert_eq!(
1379    ///     map_filter_project,
1380    ///     expected_optimized,
1381    /// );
1382    /// ```
1383    pub fn remove_undemanded(&mut self) {
1384        // Determine the demanded expressions to remove irrelevant ones.
1385        let mut demand = BTreeSet::new();
1386        for (_index, pred) in self.predicates.iter() {
1387            demand.extend(pred.support());
1388        }
1389        // Start from the output columns as presumed demanded.
1390        // If this is not the case, the caller should project some away.
1391        demand.extend(self.projection.iter().cloned());
1392        // Proceed in *reverse* order, as expressions may depend on other
1393        // expressions that precede them.
1394        for index in (0..self.expressions.len()).rev() {
1395            if demand.contains(&(self.input_arity + index)) {
1396                demand.extend(self.expressions[index].support());
1397            }
1398        }
1399
1400        // Maintain a map from initial column identifiers to locations
1401        // once we have removed undemanded expressions.
1402        let mut remap = BTreeMap::new();
1403        // This map only needs to map elements of `demand` to a new location,
1404        // but the logic is easier if we include all input columns (as the
1405        // new position is then determined by the size of the map).
1406        for index in 0..self.input_arity {
1407            remap.insert(index, index);
1408        }
1409        // Retain demanded expressions, and record their new locations.
1410        let mut new_expressions = Vec::new();
1411        for (index, expr) in self.expressions.drain(..).enumerate() {
1412            if demand.contains(&(index + self.input_arity)) {
1413                remap.insert(index + self.input_arity, remap.len());
1414                new_expressions.push(expr);
1415            }
1416        }
1417        self.expressions = new_expressions;
1418
1419        // Update column identifiers; rebuild `Self` to re-establish any invariants.
1420        // We mirror `self.permute(&remap)` but we specifically want to remap columns
1421        // that are produced by `self.expressions` after the input columns.
1422        let (expressions, predicates, projection) = self.as_map_filter_project();
1423        *self = Self::new(self.input_arity)
1424            .map(expressions.into_iter().map(|mut e| {
1425                e.permute_map(&remap);
1426                e
1427            }))
1428            .filter(predicates.into_iter().map(|mut p| {
1429                p.permute_map(&remap);
1430                p
1431            }))
1432            .project(projection.into_iter().map(|c| remap[&c]));
1433    }
1434}
1435
1436// TODO: move this elsewhere?
1437/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1438///
1439/// A part of `expr` that is memoized is replaced by a reference to column
1440/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1441/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1442/// refers to.
1443pub fn memoize_expr<E: OptimizableExpr>(
1444    expr: &mut E,
1445    memoized_parts: &mut Vec<E>,
1446    input_arity: usize,
1447) {
1448    expr.visit_mut_pre_post(&mut |e| e.eager_children(), &mut |e| {
1449        if E::is_literal(e) {
1450            // Literals do not need to be memoized.
1451            return;
1452        }
1453        if let Some(col) = e.as_column_mut() {
1454            // Column references do not need to be memoized, but may need to be
1455            // updated if they reference a column reference themselves.
1456            if *col > input_arity {
1457                if let Some(col2) = memoized_parts[*col - input_arity].as_column() {
1458                    // Update the column index in place, preserving any name information.
1459                    *col = col2;
1460                }
1461            }
1462            return;
1463        }
1464        // TODO: OOO (Optimizer Optimization Opportunity):
1465        // we are quadratic in expression size because of this .iter().position
1466        if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1467            // Any complex expression that already exists as a prior column can
1468            // be replaced by a reference to that column.
1469            *e = E::column(input_arity + position);
1470        } else {
1471            // A complex expression that does not exist should be memoized, and
1472            // replaced by a reference to the column.
1473            memoized_parts.push(std::mem::replace(
1474                e,
1475                E::column(input_arity + memoized_parts.len()),
1476            ));
1477        }
1478    });
1479}
1480
1481pub mod util {
1482    use std::collections::BTreeMap;
1483
1484    use crate::MirScalarExpr;
1485    use crate::scalar::columns::Columns;
1486
1487    #[allow(dead_code)]
1488    /// A triple of actions that map from rows to (key, val) pairs and back again.
1489    struct KeyValRowMapping {
1490        /// Expressions to apply to a row to produce key datums.
1491        to_key: Vec<MirScalarExpr>,
1492        /// Columns to project from a row to produce residual value datums.
1493        to_val: Vec<usize>,
1494        /// Columns to project from the concatenation of key and value to reconstruct the row.
1495        to_row: Vec<usize>,
1496    }
1497
1498    /// Derive supporting logic to support transforming rows to (key, val) pairs,
1499    /// and back again.
1500    ///
1501    /// We are given as input a list of mappings from columns to key indices, a key length,
1502    /// and an input arity. (The produced key should be the application of the key expressions.)
1503    ///
1504    /// To produce the `val` output, we will identify those input columns not found in
1505    /// the key expressions, and name all other columns.
1506    /// To reconstitute the original row, we identify the sequence of columns from the
1507    /// concatenation of key and val which would reconstruct the original row.
1508    ///
1509    /// The output is a pair of column sequences, the first used to reconstruct a row
1510    /// from the concatenation of key and value, and the second to identify the columns
1511    /// of a row that should become the value associated with its key.
1512    ///
1513    /// The permutations and thinning expressions generated here will be tracked in
1514    /// `compute_types::plan::AvailableCollections`; see the
1515    /// documentation there for more details.
1516    pub fn permutation_for_arrangement(
1517        key: &[impl Columns],
1518        unthinned_arity: usize,
1519    ) -> (Vec<usize>, Vec<usize>) {
1520        let columns_in_key: BTreeMap<_, _> = key
1521            .iter()
1522            .enumerate()
1523            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1524            .collect();
1525
1526        let mut input_cursor = key.len();
1527        let permutation = (0..unthinned_arity)
1528            .map(|c| {
1529                if let Some(c) = columns_in_key.get(&c) {
1530                    // Column is in key (and thus gone from the value
1531                    // of the thinned representation)
1532                    *c
1533                } else {
1534                    // Column remains in value of the thinned representation
1535                    input_cursor += 1;
1536                    input_cursor - 1
1537                }
1538            })
1539            .collect();
1540        let thinning = (0..unthinned_arity)
1541            .filter(|c| !columns_in_key.contains_key(c))
1542            .collect();
1543        (permutation, thinning)
1544    }
1545
1546    /// Given the permutations (see [`permutation_for_arrangement`] and
1547    /// (`dataflow::plan::AvailableCollections`) corresponding to two
1548    /// collections with the same key arity,
1549    /// computes the permutation for the result of joining them.
1550    pub fn join_permutations(
1551        key_arity: usize,
1552        stream_permutation: Vec<usize>,
1553        thinned_stream_arity: usize,
1554        lookup_permutation: Vec<usize>,
1555    ) -> BTreeMap<usize, usize> {
1556        let stream_arity = stream_permutation.len();
1557        let lookup_arity = lookup_permutation.len();
1558
1559        (0..stream_arity + lookup_arity)
1560            .map(|i| {
1561                let location = if i < stream_arity {
1562                    stream_permutation[i]
1563                } else {
1564                    let location_in_lookup = lookup_permutation[i - stream_arity];
1565                    if location_in_lookup < key_arity {
1566                        location_in_lookup
1567                    } else {
1568                        location_in_lookup + thinned_stream_arity
1569                    }
1570                };
1571                (i, location)
1572            })
1573            .collect()
1574    }
1575}
1576
1577pub mod plan {
1578    use std::iter;
1579
1580    use mz_repr::{Datum, Diff, Row, RowArena};
1581    use serde::{Deserialize, Serialize};
1582
1583    use crate::Eval;
1584    use crate::scalar::optimizable::OptimizableExpr;
1585    use crate::{EvalError, MapFilterProject, MirScalarExpr};
1586
1587    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1588    #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1589    #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1590    pub struct SafeMfpPlan<E: OptimizableExpr = MirScalarExpr> {
1591        pub(crate) mfp: MapFilterProject<E>,
1592    }
1593
1594    impl<E: OptimizableExpr> SafeMfpPlan<E> {
1595        /// Wrap a `MapFilterProject` in a `SafeMfpPlan`.
1596        pub fn from_mfp(mfp: MapFilterProject<E>) -> Self {
1597            Self { mfp }
1598        }
1599
1600        /// Unwrap the inner `MapFilterProject`.
1601        pub fn into_mfp(self) -> MapFilterProject<E> {
1602            self.mfp
1603        }
1604
1605        /// Remaps references to input columns according to `remap`.
1606        ///
1607        /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1608        pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1609        where
1610            F: Fn(usize) -> usize,
1611        {
1612            self.mfp.permute_fn(remap, new_arity);
1613        }
1614
1615        /// Returns true when `Self` is the identity.
1616        pub fn is_identity(&self) -> bool {
1617            self.mfp.is_identity()
1618        }
1619    }
1620
1621    impl<E: OptimizableExpr + Eval> SafeMfpPlan<E> {
1622        /// Evaluates the linear operator on a supplied list of datums.
1623        ///
1624        /// The arguments are the initial datums associated with the row,
1625        /// and an appropriately lifetimed arena for temporary allocations
1626        /// needed by scalar evaluation.
1627        ///
1628        /// An `Ok` result will either be `None` if any predicate did not
1629        /// evaluate to `Datum::True`, or the values of the columns listed
1630        /// by `self.projection` if all predicates passed. If an error
1631        /// occurs in the evaluation it is returned as an `Err` variant.
1632        /// As the evaluation exits early with failed predicates, it may
1633        /// miss some errors that would occur later in evaluation.
1634        ///
1635        /// The `row` is not cleared first, but emptied if the function
1636        /// returns `Ok(Some(row)).
1637        #[inline(always)]
1638        pub fn evaluate_into<'a, 'row>(
1639            &'a self,
1640            datums: &mut Vec<Datum<'a>>,
1641            arena: &'a RowArena,
1642            row_buf: &'row mut Row,
1643        ) -> Result<Option<&'row Row>, EvalError> {
1644            let passed_predicates = self.evaluate_inner(datums, arena)?;
1645            if !passed_predicates {
1646                Ok(None)
1647            } else {
1648                row_buf
1649                    .packer()
1650                    .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1651                Ok(Some(row_buf))
1652            }
1653        }
1654
1655        /// A version of `evaluate` which produces an iterator over `Datum`
1656        /// as output.
1657        ///
1658        /// This version can be useful when one wants to capture the resulting
1659        /// datums without packing and then unpacking a row.
1660        #[inline(always)]
1661        pub fn evaluate_iter<'b, 'a: 'b>(
1662            &'a self,
1663            datums: &'b mut Vec<Datum<'a>>,
1664            arena: &'a RowArena,
1665        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1666            let passed_predicates = self.evaluate_inner(datums, arena)?;
1667            if !passed_predicates {
1668                Ok(None)
1669            } else {
1670                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1671            }
1672        }
1673
1674        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1675        ///
1676        /// This does not apply `self.projection`, which is up to the calling method.
1677        pub fn evaluate_inner<'b, 'a: 'b>(
1678            &'a self,
1679            datums: &'b mut Vec<Datum<'a>>,
1680            arena: &'a RowArena,
1681        ) -> Result<bool, EvalError> {
1682            let mut expression = 0;
1683            for (support, predicate) in self.mfp.predicates.iter() {
1684                while self.mfp.input_arity + expression < *support {
1685                    datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1686                    expression += 1;
1687                }
1688                if predicate.eval(&datums[..], arena)? != Datum::True {
1689                    return Ok(false);
1690                }
1691            }
1692            while expression < self.mfp.expressions.len() {
1693                datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1694                expression += 1;
1695            }
1696            Ok(true)
1697        }
1698
1699        /// Returns true if evaluation could introduce an error on non-error inputs.
1700        pub fn could_error(&self) -> bool {
1701            self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1702                || self.mfp.expressions.iter().any(|e| e.could_error())
1703        }
1704    }
1705
1706    impl<E: OptimizableExpr> std::ops::Deref for SafeMfpPlan<E> {
1707        type Target = MapFilterProject<E>;
1708        fn deref(&self) -> &Self::Target {
1709            &self.mfp
1710        }
1711    }
1712
1713    /// Predicates partitioned into temporal and non-temporal.
1714    ///
1715    /// Temporal predicates require some recognition to determine their
1716    /// structure, and it is best to do that once and re-use the results.
1717    ///
1718    /// There are restrictions on the temporal predicates we currently support.
1719    /// They must directly constrain `MzNow` from below or above,
1720    /// by expressions that do not themselves contain `MzNow`.
1721    /// Conjunctions of such constraints are also ok.
1722    #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
1723    #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1724    pub struct MfpPlan<E: OptimizableExpr = MirScalarExpr> {
1725        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1726        pub(crate) mfp: SafeMfpPlan<E>,
1727        /// Expressions that when evaluated lower-bound or equal (<=) `MzNow`.
1728        pub(crate) lower_bounds: Vec<E>,
1729        /// Expressions that when evaluated strictly upper-bound `MzNow`.
1730        pub(crate) upper_bounds: Vec<E>,
1731    }
1732
1733    impl<E: OptimizableExpr> MfpPlan<E> {
1734        /// Construct an `MfpPlan` from its components.
1735        pub fn from_parts(mfp: SafeMfpPlan<E>, lower_bounds: Vec<E>, upper_bounds: Vec<E>) -> Self {
1736            Self {
1737                mfp,
1738                lower_bounds,
1739                upper_bounds,
1740            }
1741        }
1742
1743        /// Deconstruct into components.
1744        pub fn into_parts(self) -> (SafeMfpPlan<E>, Vec<E>, Vec<E>) {
1745            (self.mfp, self.lower_bounds, self.upper_bounds)
1746        }
1747
1748        /// Access the inner `SafeMfpPlan`.
1749        pub fn safe_mfp(&self) -> &SafeMfpPlan<E> {
1750            &self.mfp
1751        }
1752
1753        /// Borrow all parts: `(safe_mfp, lower_bounds, upper_bounds)`.
1754        pub fn as_parts(&self) -> (&SafeMfpPlan<E>, &[E], &[E]) {
1755            (&self.mfp, &self.lower_bounds, &self.upper_bounds)
1756        }
1757
1758        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1759        ///
1760        /// The first returned list is of predicates that do not contain `mz_now`.
1761        /// The second and third returned lists contain expressions that, once evaluated, lower
1762        /// and upper bound the validity interval of a record, respectively. These second two
1763        /// lists are populated only by binary expressions of the form
1764        /// ```ignore
1765        /// mz_now cmp_op expr
1766        /// ```
1767        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1768        ///
1769        /// If any unsupported expression is found, for example one that uses `mz_now`
1770        /// in an unsupported position, an error is returned.
1771        pub fn create_from(mut mfp: MapFilterProject<E>) -> Result<Self, String> {
1772            mfp.optimize();
1773
1774            let mut temporal = Vec::new();
1775            mfp.predicates.retain(|(_position, predicate)| {
1776                if OptimizableExpr::contains_temporal(predicate) {
1777                    temporal.push(predicate.clone());
1778                    false
1779                } else {
1780                    true
1781                }
1782            });
1783
1784            let (lower_bounds, upper_bounds) = E::extract_temporal_bounds(temporal)?;
1785
1786            Ok(Self {
1787                mfp: SafeMfpPlan { mfp },
1788                lower_bounds,
1789                upper_bounds,
1790            })
1791        }
1792
1793        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1794        pub fn is_identity(&self) -> bool {
1795            self.mfp.mfp.is_identity()
1796                && self.lower_bounds.is_empty()
1797                && self.upper_bounds.is_empty()
1798        }
1799
1800        /// Returns `true` if the plan contains temporal bounds
1801        /// (i.e., predicates involving `mz_now()`).
1802        pub fn has_temporal_bounds(&self) -> bool {
1803            !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
1804        }
1805
1806        /// Returns `self`, and leaves behind an identity operator that acts on its output.
1807        pub fn take(&mut self) -> Self {
1808            let mut identity = Self {
1809                mfp: SafeMfpPlan {
1810                    mfp: MapFilterProject::new(self.mfp.projection.len()),
1811                },
1812                lower_bounds: Default::default(),
1813                upper_bounds: Default::default(),
1814            };
1815            std::mem::swap(self, &mut identity);
1816            identity
1817        }
1818
1819        /// Attempt to convert self into a non-temporal MapFilterProject plan.
1820        ///
1821        /// If that is not possible, the original instance is returned as an error.
1822        #[allow(clippy::result_large_err)]
1823        pub fn into_nontemporal(self) -> Result<SafeMfpPlan<E>, Self> {
1824            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1825                Ok(self.mfp)
1826            } else {
1827                Err(self)
1828            }
1829        }
1830
1831        /// Returns an iterator over mutable references to all non-temporal
1832        /// scalar expressions in the plan.
1833        ///
1834        /// The order of iteration is unspecified.
1835        pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut E> {
1836            iter::empty()
1837                .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1838                .chain(&mut self.mfp.mfp.expressions)
1839                .chain(&mut self.lower_bounds)
1840                .chain(&mut self.upper_bounds)
1841        }
1842
1843        /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1844        ///
1845        /// At the moment, this is only true if it projects away all columns and applies no filters,
1846        /// but it could be extended to plans that produce literals independent of the input.
1847        pub fn ignores_input(&self) -> bool {
1848            self.lower_bounds.is_empty()
1849                && self.upper_bounds.is_empty()
1850                && self.mfp.mfp.projection.is_empty()
1851                && self.mfp.mfp.predicates.is_empty()
1852        }
1853    }
1854
1855    impl<E: OptimizableExpr + Eval> MfpPlan<E> {
1856        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1857        ///
1858        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1859        /// or an evaluation error. If `self contains temporal predicates, the results can be times
1860        /// that are greater than the input `time`, and may contain negated `diff` values.
1861        ///
1862        /// The `row_builder` is not cleared first, but emptied if the function
1863        /// returns an iterator with any `Ok(_)` element.
1864        pub fn evaluate<'b, 'a: 'b, Err: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1865            &'a self,
1866            datums: &'b mut Vec<Datum<'a>>,
1867            arena: &'a RowArena,
1868            time: mz_repr::Timestamp,
1869            diff: Diff,
1870            valid_time: V,
1871            row_builder: &mut Row,
1872        ) -> impl Iterator<
1873            Item = Result<(Row, mz_repr::Timestamp, Diff), (Err, mz_repr::Timestamp, Diff)>,
1874        > + use<Err, V, E> {
1875            match self.mfp.evaluate_inner(datums, arena) {
1876                Err(e) => {
1877                    return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1878                }
1879                Ok(true) => {}
1880                Ok(false) => {
1881                    return None.into_iter().chain(None);
1882                }
1883            }
1884
1885            // Lower and upper bounds.
1886            let mut lower_bound = time;
1887            let mut upper_bound = None;
1888
1889            // Track whether we have seen a null in either bound, as this should
1890            // prevent the record from being produced at any time.
1891            let mut null_eval = false;
1892
1893            // Advance our lower bound to be at least the result of any lower bound
1894            // expressions.
1895            for l in self.lower_bounds.iter() {
1896                match l.eval(datums, arena) {
1897                    Err(e) => {
1898                        return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1899                    }
1900                    Ok(Datum::MzTimestamp(d)) => {
1901                        lower_bound = lower_bound.max(d);
1902                    }
1903                    Ok(Datum::Null) => {
1904                        null_eval = true;
1905                    }
1906                    x => {
1907                        panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1908                    }
1909                }
1910            }
1911
1912            // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1913            if !valid_time(&lower_bound) {
1914                return None.into_iter().chain(None);
1915            }
1916
1917            // If there are any upper bounds, determine the minimum upper bound.
1918            for u in self.upper_bounds.iter() {
1919                // We can cease as soon as the lower and upper bounds match,
1920                // as the update will certainly not be produced in that case.
1921                if upper_bound != Some(lower_bound) {
1922                    match u.eval(datums, arena) {
1923                        Err(e) => {
1924                            return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1925                        }
1926                        Ok(Datum::MzTimestamp(d)) => {
1927                            if let Some(upper) = upper_bound {
1928                                upper_bound = Some(upper.min(d));
1929                            } else {
1930                                upper_bound = Some(d);
1931                            };
1932                            // Force the upper bound to be at least the lower
1933                            // bound. The `is_some()` test should always be true
1934                            // due to the above block, but maintain it here in
1935                            // case that changes. It's hopefully optimized away.
1936                            if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1937                                upper_bound = Some(lower_bound);
1938                            }
1939                        }
1940                        Ok(Datum::Null) => {
1941                            null_eval = true;
1942                        }
1943                        x => {
1944                            panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1945                        }
1946                    }
1947                }
1948            }
1949
1950            // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1951            if let Some(upper) = &mut upper_bound {
1952                if !valid_time(upper) {
1953                    upper_bound = None;
1954                }
1955            }
1956
1957            // Produce an output only if the upper bound exceeds the lower bound,
1958            // and if we did not encounter a `null` in our evaluation.
1959            if Some(lower_bound) != upper_bound && !null_eval {
1960                row_builder
1961                    .packer()
1962                    .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1963                let upper_opt =
1964                    upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1965                let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1966                lower.into_iter().chain(upper_opt)
1967            } else {
1968                None.into_iter().chain(None)
1969            }
1970        }
1971
1972        /// Returns true if evaluation could introduce an error on non-error inputs.
1973        pub fn could_error(&self) -> bool {
1974            self.mfp.could_error()
1975                || self.lower_bounds.iter().any(|e| e.could_error())
1976                || self.upper_bounds.iter().any(|e| e.could_error())
1977        }
1978    }
1979}