Skip to main content

mz_expr/
linear.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::visit::Visit;
16use crate::{MirRelationExpr, MirScalarExpr};
17
18/// A compound operator that can be applied row-by-row.
19///
20/// This operator integrates the map, filter, and project operators.
21/// It applies a sequences of map expressions, which are allowed to
22/// refer to previous expressions, interleaved with predicates which
23/// must be satisfied for an output to be produced. If all predicates
24/// evaluate to `Datum::True` the data at the identified columns are
25/// collected and produced as output in a packed `Row`.
26///
27/// This operator is a "builder" and its contents may contain expressions
28/// that are not yet executable. For example, it may contain temporal
29/// expressions in `self.expressions`, even though this is not something
30/// we can directly evaluate. The plan creation methods will defensively
31/// ensure that the right thing happens.
32#[derive(
33    Clone,
34    Debug,
35    Eq,
36    PartialEq,
37    Serialize,
38    Deserialize,
39    Hash,
40    Ord,
41    PartialOrd
42)]
43pub struct MapFilterProject {
44    /// A sequence of expressions that should be appended to the row.
45    ///
46    /// Many of these expressions may not be produced in the output,
47    /// and may only be present as common subexpressions.
48    pub expressions: Vec<MirScalarExpr>,
49    /// Expressions that must evaluate to `Datum::True` for the output
50    /// row to be produced.
51    ///
52    /// Each entry is prepended with a column identifier indicating
53    /// the column *before* which the predicate should first be applied.
54    /// Most commonly this would be one plus the largest column identifier
55    /// in the predicate's support, but it could be larger to implement
56    /// guarded evaluation of predicates.
57    ///
58    /// This list should be sorted by the first field.
59    pub predicates: Vec<(usize, MirScalarExpr)>,
60    /// A sequence of column identifiers whose data form the output row.
61    pub projection: Vec<usize>,
62    /// The expected number of input columns.
63    ///
64    /// This is needed to ensure correct identification of newly formed
65    /// columns in the output.
66    pub input_arity: usize,
67}
68
69impl Display for MapFilterProject {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        writeln!(f, "MapFilterProject(")?;
72        writeln!(f, "  expressions:")?;
73        self.expressions
74            .iter()
75            .enumerate()
76            .try_for_each(|(i, e)| writeln!(f, "    #{} <- {},", i + self.input_arity, e))?;
77        writeln!(f, "  predicates:")?;
78        self.predicates
79            .iter()
80            .try_for_each(|(before, p)| writeln!(f, "    <before: {}> {},", before, p))?;
81        writeln!(f, "  projection: {:?}", self.projection)?;
82        writeln!(f, "  input_arity: {}", self.input_arity)?;
83        writeln!(f, ")")
84    }
85}
86
87impl MapFilterProject {
88    /// Create a no-op operator for an input of a supplied arity.
89    pub fn new(input_arity: usize) -> Self {
90        Self {
91            expressions: Vec::new(),
92            predicates: Vec::new(),
93            projection: (0..input_arity).collect(),
94            input_arity,
95        }
96    }
97
98    /// Given two mfps, return an mfp that applies one
99    /// followed by the other.
100    /// Note that the arguments are in the opposite order
101    /// from how function composition is usually written in mathematics.
102    pub fn compose(before: Self, after: Self) -> Self {
103        let (m, f, p) = after.into_map_filter_project();
104        before.map(m).filter(f).project(p)
105    }
106
107    /// True if the operator describes the identity transformation.
108    pub fn is_identity(&self) -> bool {
109        self.expressions.is_empty()
110            && self.predicates.is_empty()
111            && self.projection.len() == self.input_arity
112            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
113    }
114
115    /// Retain only the indicated columns in the presented order.
116    pub fn project<I>(mut self, columns: I) -> Self
117    where
118        I: IntoIterator<Item = usize> + std::fmt::Debug,
119    {
120        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
121        self
122    }
123
124    /// Retain only rows satisfying these predicates.
125    ///
126    /// This method introduces predicates as eagerly as they can be evaluated,
127    /// which may not be desired for predicates that may cause exceptions.
128    /// If fine manipulation is required, the predicates can be added manually.
129    pub fn filter<I>(mut self, predicates: I) -> Self
130    where
131        I: IntoIterator<Item = MirScalarExpr>,
132    {
133        for mut predicate in predicates {
134            // Correct column references.
135            predicate.permute(&self.projection[..]);
136
137            // Validate column references.
138            assert!(
139                predicate
140                    .support()
141                    .into_iter()
142                    .all(|c| c < self.input_arity + self.expressions.len())
143            );
144
145            // Insert predicate as eagerly as it can be evaluated:
146            // just after the largest column in its support is formed.
147            let max_support = predicate
148                .support()
149                .into_iter()
150                .max()
151                .map(|c| c + 1)
152                .unwrap_or(0);
153            self.predicates.push((max_support, predicate))
154        }
155        // Stable sort predicates by position at which they take effect.
156        // We put literal errors at the end as a stop-gap to avoid erroring
157        // before we are able to evaluate any predicates that might prevent it.
158        self.predicates
159            .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
160        self
161    }
162
163    /// Append the result of evaluating expressions to each row.
164    pub fn map<I>(mut self, expressions: I) -> Self
165    where
166        I: IntoIterator<Item = MirScalarExpr>,
167    {
168        for mut expression in expressions {
169            // Correct column references.
170            expression.permute(&self.projection[..]);
171
172            // Validate column references.
173            assert!(
174                expression
175                    .support()
176                    .into_iter()
177                    .all(|c| c < self.input_arity + self.expressions.len())
178            );
179
180            // Introduce expression and produce as output.
181            self.expressions.push(expression);
182            self.projection
183                .push(self.input_arity + self.expressions.len() - 1);
184        }
185
186        self
187    }
188
189    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
190    pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
191        let predicates = self
192            .predicates
193            .into_iter()
194            .map(|(_pos, predicate)| predicate)
195            .collect();
196        (self.expressions, predicates, self.projection)
197    }
198
199    /// As the arguments to `Map`, `Filter`, and `Project` operators.
200    ///
201    /// In principle, this operator can be implemented as a sequence of
202    /// more elemental operators, likely less efficiently.
203    pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
204        self.clone().into_map_filter_project()
205    }
206
207    /// Determines if a scalar expression must be equal to a literal datum.
208    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
209        for (_pos, predicate) in self.predicates.iter() {
210            if let MirScalarExpr::CallBinary {
211                func: crate::BinaryFunc::Eq(_),
212                expr1,
213                expr2,
214            } = predicate
215            {
216                if let Some(Ok(datum1)) = expr1.as_literal() {
217                    if &**expr2 == expr {
218                        return Some(datum1);
219                    }
220                }
221                if let Some(Ok(datum2)) = expr2.as_literal() {
222                    if &**expr1 == expr {
223                        return Some(datum2);
224                    }
225                }
226            }
227        }
228        None
229    }
230
231    /// Determines if a sequence of scalar expressions must be equal to a literal row.
232    ///
233    /// This method returns `None` on an empty `exprs`, which might be surprising, but
234    /// seems to line up with its callers' expectations of that being a non-constraint.
235    /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
236    /// if they would rather have a literal empty row.
237    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
238        if exprs.is_empty() {
239            return None;
240        }
241        let mut row = Row::default();
242        let mut packer = row.packer();
243        for expr in exprs {
244            if let Some(literal) = self.literal_constraint(expr) {
245                packer.push(literal);
246            } else {
247                return None;
248            }
249        }
250        Some(row)
251    }
252
253    /// Extracts any MapFilterProject at the root of the expression.
254    ///
255    /// The expression will be modified to extract any maps, filters, and
256    /// projections, which will be returned as `Self`. If there are no maps,
257    /// filters, or projections the method will return an identity operator.
258    ///
259    /// The extracted expressions may contain temporal predicates, and one
260    /// should be careful to apply them blindly.
261    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
262        // TODO: This could become iterative rather than recursive if
263        // we were able to fuse MFP operators from below, rather than
264        // from above.
265        match expr {
266            MirRelationExpr::Map { input, scalars } => {
267                let (mfp, expr) = Self::extract_from_expression(input);
268                (mfp.map(scalars.iter().cloned()), expr)
269            }
270            MirRelationExpr::Filter { input, predicates } => {
271                let (mfp, expr) = Self::extract_from_expression(input);
272                (mfp.filter(predicates.iter().cloned()), expr)
273            }
274            MirRelationExpr::Project { input, outputs } => {
275                let (mfp, expr) = Self::extract_from_expression(input);
276                (mfp.project(outputs.iter().cloned()), expr)
277            }
278            // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
279            // this call to `arity()`.
280            x => (Self::new(x.arity()), x),
281        }
282    }
283
284    /// Extracts an error-free MapFilterProject at the root of the expression.
285    ///
286    /// The expression will be modified to extract maps, filters, and projects
287    /// from the root of the expression, which will be returned as `Self`. The
288    /// extraction will halt if a Map or Filter containing a literal error is
289    /// reached. Otherwise, the method will return an identity operator.
290    ///
291    /// This method is meant to be used during optimization, where it is
292    /// necessary to avoid moving around maps and filters with errors.
293    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
294        match expr {
295            MirRelationExpr::Map { input, scalars }
296                if scalars.iter().all(|s| !s.is_literal_err()) =>
297            {
298                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
299                (mfp.map(scalars.iter().cloned()), expr)
300            }
301            MirRelationExpr::Filter { input, predicates }
302                if predicates.iter().all(|p| !p.is_literal_err()) =>
303            {
304                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
305                (mfp.filter(predicates.iter().cloned()), expr)
306            }
307            MirRelationExpr::Project { input, outputs } => {
308                let (mfp, expr) = Self::extract_non_errors_from_expr(input);
309                (mfp.project(outputs.iter().cloned()), expr)
310            }
311            x => (Self::new(x.arity()), x),
312        }
313    }
314
315    /// Extracts an error-free MapFilterProject at the root of the expression.
316    ///
317    /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
318    /// mutable reference.
319    pub fn extract_non_errors_from_expr_ref_mut(
320        expr: &mut MirRelationExpr,
321    ) -> (Self, &mut MirRelationExpr) {
322        // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
323        // superfluous outer if, which works around a borrow-checker issue:
324        // https://github.com/rust-lang/rust/issues/54663
325        if matches!(
326            expr,
327            MirRelationExpr::Map { input: _, scalars }
328                if scalars.iter().all(|s| !s.is_literal_err())
329        ) || matches!(
330            expr,
331            MirRelationExpr::Filter { input: _, predicates }
332                if predicates.iter().all(|p| !p.is_literal_err())
333        ) || matches!(expr, MirRelationExpr::Project { .. })
334        {
335            match expr {
336                MirRelationExpr::Map { input, scalars }
337                    if scalars.iter().all(|s| !s.is_literal_err()) =>
338                {
339                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
340                    (mfp.map(scalars.iter().cloned()), expr)
341                }
342                MirRelationExpr::Filter { input, predicates }
343                    if predicates.iter().all(|p| !p.is_literal_err()) =>
344                {
345                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
346                    (mfp.filter(predicates.iter().cloned()), expr)
347                }
348                MirRelationExpr::Project { input, outputs } => {
349                    let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
350                    (mfp.project(outputs.iter().cloned()), expr)
351                }
352                _ => unreachable!(),
353            }
354        } else {
355            (Self::new(expr.arity()), expr)
356        }
357    }
358
359    /// Removes an error-free MapFilterProject from the root of the expression.
360    ///
361    /// The expression will be modified to extract maps, filters, and projects
362    /// from the root of the expression, which will be returned as `Self`. The
363    /// extraction will halt if a Map or Filter containing a literal error is
364    /// reached. Otherwise, the method will return an
365    /// identity operator, and the expression will remain unchanged.
366    ///
367    /// This method is meant to be used during optimization, where it is
368    /// necessary to avoid moving around maps and filters with errors.
369    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
370        match expr {
371            MirRelationExpr::Map { input, scalars }
372                if scalars.iter().all(|s| !s.is_literal_err()) =>
373            {
374                let mfp =
375                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
376                *expr = input.take_dangerous();
377                mfp
378            }
379            MirRelationExpr::Filter { input, predicates }
380                if predicates.iter().all(|p| !p.is_literal_err()) =>
381            {
382                let mfp = Self::extract_non_errors_from_expr_mut(input)
383                    .filter(predicates.iter().cloned());
384                *expr = input.take_dangerous();
385                mfp
386            }
387            MirRelationExpr::Project { input, outputs } => {
388                let mfp =
389                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
390                *expr = input.take_dangerous();
391                mfp
392            }
393            x => Self::new(x.arity()),
394        }
395    }
396
397    /// Returns `true` if any predicate in this MFP contains a temporal expression (`mz_now()`).
398    pub fn has_temporal_predicates(&self) -> bool {
399        self.predicates
400            .iter()
401            .any(|(_, predicate)| predicate.contains_temporal())
402    }
403
404    /// Extracts temporal predicates into their own `Self`.
405    ///
406    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
407    /// though there could be justification for extracting them as well if they are otherwise
408    /// unused.
409    ///
410    /// This separation is valuable when the execution cannot be fused into one operator.
411    pub fn extract_temporal(&mut self) -> Self {
412        // Optimize the expression, as it is only post-optimization that we can be certain
413        // that temporal expressions are restricted to filters. We could relax this in the
414        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
415        // seems to be the best fit at the moment.
416        self.optimize();
417
418        // Assert that we no longer have temporal expressions to evaluate. This should only
419        // occur if the optimization above results with temporal expressions yielded in the
420        // output, which is out of spec for how the type is meant to be used.
421        assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
422
423        // Extract temporal predicates from `self.predicates`.
424        let mut temporal_predicates = Vec::new();
425        self.predicates.retain(|(_position, predicate)| {
426            if predicate.contains_temporal() {
427                temporal_predicates.push(predicate.clone());
428                false
429            } else {
430                true
431            }
432        });
433
434        // Determine extended input columns used by temporal filters.
435        let mut support = BTreeSet::new();
436        for predicate in temporal_predicates.iter() {
437            support.extend(predicate.support());
438        }
439
440        // Discover the locations of these columns after `self.projection`.
441        let old_projection_len = self.projection.len();
442        let mut new_location = BTreeMap::new();
443        for original in support.iter() {
444            if let Some(position) = self.projection.iter().position(|x| x == original) {
445                new_location.insert(*original, position);
446            } else {
447                new_location.insert(*original, self.projection.len());
448                self.projection.push(*original);
449            }
450        }
451        // Permute references in extracted predicates to their new locations.
452        for predicate in temporal_predicates.iter_mut() {
453            predicate.permute_map(&new_location);
454        }
455
456        // Form a new `Self` containing the temporal predicates to return.
457        Self::new(self.projection.len())
458            .filter(temporal_predicates)
459            .project(0..old_projection_len)
460    }
461
462    /// Extracts common expressions from multiple `Self` into a result `Self`.
463    ///
464    /// The argument `mfps` are mutated so that each are functionaly equivalent to their
465    /// corresponding input, when composed atop the resulting `Self`.
466    ///
467    /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
468    pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
469        match mfps.len() {
470            0 => {
471                panic!("Cannot call method on empty arguments");
472            }
473            1 => {
474                let output_arity = mfps[0].projection.len();
475                std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
476            }
477            _ => {
478                // More generally, we convert each mfp to ANF, at which point we can
479                // repeatedly extract atomic expressions that depend only on input
480                // columns, migrate them to an input mfp, and repeat until no such
481                // expressions exist. At this point, we can also migrate predicates
482                // and then determine and push down projections.
483
484                // Prepare a return `Self`.
485                let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
486
487                // We convert each mfp to ANF, using `memoize_expressions`.
488                for mfp in mfps.iter_mut() {
489                    mfp.memoize_expressions();
490                }
491
492                // We repeatedly extract common expressions, until none remain.
493                let mut done = false;
494                while !done {
495                    // We use references to determine common expressions, and must
496                    // introduce a scope here to drop the borrows before mutation.
497                    let common = {
498                        // The input arity may increase as we iterate, so recapture.
499                        let input_arity = result_mfp.projection.len();
500                        let mut prev: BTreeSet<_> = mfps[0]
501                            .expressions
502                            .iter()
503                            .filter(|e| e.support().iter().max() < Some(&input_arity))
504                            .collect();
505                        let mut next = BTreeSet::default();
506                        for mfp in mfps[1..].iter() {
507                            for expr in mfp.expressions.iter() {
508                                if prev.contains(expr) {
509                                    next.insert(expr);
510                                }
511                            }
512                            std::mem::swap(&mut prev, &mut next);
513                            next.clear();
514                        }
515                        prev.into_iter().cloned().collect::<Vec<_>>()
516                    };
517                    // Without new common expressions, we should terminate the loop.
518                    done = common.is_empty();
519
520                    // Migrate each expression in `common` to `result_mfp`.
521                    for expr in common.into_iter() {
522                        // Update each mfp by removing expr and updating column references.
523                        for mfp in mfps.iter_mut() {
524                            // With `expr` next in `result_mfp`, it is as if we are rotating it to
525                            // be the first expression in `mfp`, and then removing it from `mfp` and
526                            // increasing the input arity of `mfp`.
527                            let arity = result_mfp.projection.len();
528                            let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
529                            let index = arity + found;
530                            // Column references change due to the rotation from `index` to `arity`.
531                            let action = |c: &mut usize| {
532                                if arity <= *c && *c < index {
533                                    *c += 1;
534                                } else if *c == index {
535                                    *c = arity;
536                                }
537                            };
538                            // Rotate `expr` from `found` to first, and then snip.
539                            // Short circuit by simply removing and incrementing the input arity.
540                            mfp.input_arity += 1;
541                            mfp.expressions.remove(found);
542                            // Update column references in expressions, predicates, and projections.
543                            for e in mfp.expressions.iter_mut() {
544                                e.visit_columns(action);
545                            }
546                            for (o, e) in mfp.predicates.iter_mut() {
547                                e.visit_columns(action);
548                                // Max out the offset for the predicate; optimization will correct.
549                                *o = mfp.input_arity + mfp.expressions.len();
550                            }
551                            for c in mfp.projection.iter_mut() {
552                                action(c);
553                            }
554                        }
555                        // Install the expression and update
556                        result_mfp.expressions.push(expr);
557                        result_mfp.projection.push(result_mfp.projection.len());
558                    }
559                }
560                // As before, but easier: predicates in common to all mfps.
561                let common_preds: Vec<MirScalarExpr> = {
562                    let input_arity = result_mfp.projection.len();
563                    let mut prev: BTreeSet<_> = mfps[0]
564                        .predicates
565                        .iter()
566                        .map(|(_, e)| e)
567                        .filter(|e| e.support().iter().max() < Some(&input_arity))
568                        .collect();
569                    let mut next = BTreeSet::default();
570                    for mfp in mfps[1..].iter() {
571                        for (_, expr) in mfp.predicates.iter() {
572                            if prev.contains(expr) {
573                                next.insert(expr);
574                            }
575                        }
576                        std::mem::swap(&mut prev, &mut next);
577                        next.clear();
578                    }
579                    // Expressions in common, that we will append to `result_mfp.expressions`.
580                    prev.into_iter().cloned().collect::<Vec<_>>()
581                };
582                for mfp in mfps.iter_mut() {
583                    mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
584                    mfp.optimize();
585                }
586                result_mfp.predicates.extend(
587                    common_preds
588                        .into_iter()
589                        .map(|e| (result_mfp.projection.len(), e)),
590                );
591
592                // Then, look for unused columns and project them away.
593                let mut common_demand = BTreeSet::new();
594                for mfp in mfps.iter() {
595                    common_demand.extend(mfp.demand());
596                }
597                // columns in `common_demand` must be retained, but others
598                // may be discarded.
599                let common_demand = (0..result_mfp.projection.len())
600                    .filter(|x| common_demand.contains(x))
601                    .collect::<Vec<_>>();
602                let remap = common_demand
603                    .iter()
604                    .cloned()
605                    .enumerate()
606                    .map(|(new, old)| (old, new))
607                    .collect::<BTreeMap<_, _>>();
608                for mfp in mfps.iter_mut() {
609                    mfp.permute_fn(|c| remap[&c], common_demand.len());
610                }
611                result_mfp = result_mfp.project(common_demand);
612
613                // Return the resulting MFP.
614                result_mfp.optimize();
615                result_mfp
616            }
617        }
618    }
619
620    /// Returns `self`, and leaves behind an identity operator that acts on its output.
621    pub fn take(&mut self) -> Self {
622        let mut identity = Self::new(self.projection.len());
623        std::mem::swap(self, &mut identity);
624        identity
625    }
626
627    /// Convert the `MapFilterProject` into a staged evaluation plan.
628    ///
629    /// The main behavior is extract temporal predicates, which cannot be evaluated
630    /// using the standard machinery.
631    pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
632        plan::MfpPlan::create_from(self)
633    }
634}
635
636impl MapFilterProject {
637    /// Partitions `self` into two instances, one of which can be eagerly applied.
638    ///
639    /// The `available` argument indicates which input columns are available (keys)
640    /// and in which positions (values). This information may allow some maps and
641    /// filters to execute. The `input_arity` argument reports the total number of
642    /// input columns (which may include some not present in `available`)
643    ///
644    /// This method partitions `self` in two parts, `(before, after)`, where `before`
645    /// can be applied on columns present as keys in `available`, and `after` must
646    /// await the introduction of the other input columns.
647    ///
648    /// The `before` instance will *append* any columns that can be determined from
649    /// `available` but will project away any of these columns that are not needed by
650    /// `after`. Importantly, this means that `before` will leave intact *all* input
651    /// columns including those not referenced in `available`.
652    ///
653    /// The `after` instance will presume all input columns are available, followed
654    /// by the appended columns of the `before` instance. It may be that some input
655    /// columns can be projected away in `before` if `after` does not need them, but
656    /// we leave that as something the caller can apply if needed (it is otherwise
657    /// complicated to negotiate which input columns `before` should retain).
658    ///
659    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
660    /// additional input columns, permute all input columns to their locations as
661    /// expected by `self`, follow this by new columns appended by `before`, and
662    /// remove all other columns that may be present.
663    ///
664    /// # Example
665    ///
666    /// ```rust
667    /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
668    ///
669    /// // imagine an action on columns (a, b, c, d).
670    /// let original = MapFilterProject::new(4).map(vec![
671    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
672    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
673    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
674    /// ]).project(vec![6]);
675    ///
676    /// // Imagine we start with columns (b, x, a, y, c).
677    /// //
678    /// // The `partition` method requires a map from *expected* input columns to *actual*
679    /// // input columns. In the example above, the columns a, b, and c exist, and are at
680    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
681    /// let mut available_columns = std::collections::BTreeMap::new();
682    /// available_columns.insert(0, 2);
683    /// available_columns.insert(1, 0);
684    /// available_columns.insert(2, 4);
685    /// // Partition `original` using the available columns and current input arity.
686    /// // This informs `partition` which columns are available, where they can be found,
687    /// // and how many columns are not relevant but should be preserved.
688    /// let (before, after) = original.partition(available_columns, 5);
689    ///
690    /// // `before` sees all five input columns, and should append `a + b + c`.
691    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
692    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
693    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
694    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
695    ///
696    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
697    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
698    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
699    /// ]).project(vec![5]));
700    ///
701    /// // To reconstruct `self`, we must introduce the columns that are not present,
702    /// // and present them in the order intended by `self`. In this example, we must
703    /// // introduce column d and permute the columns so that they begin (a, b, c, d).
704    /// // The columns x and y must be projected away, and any columns introduced by
705    /// // `begin` must be retained in their current order.
706    ///
707    /// // The `after` instance expects to be provided with all inputs, but it
708    /// // may not need all inputs. The `demand()` and `permute()` methods can
709    /// // optimize the representation.
710    /// ```
711    pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
712        // Map expressions, filter predicates, and projections for `before` and `after`.
713        let mut before_expr = Vec::new();
714        let mut before_pred = Vec::new();
715        let mut before_proj = Vec::new();
716        let mut after_expr = Vec::new();
717        let mut after_pred = Vec::new();
718        let mut after_proj = Vec::new();
719
720        // Track which output columns must be preserved in the output of `before`.
721        let mut demanded = BTreeSet::new();
722        demanded.extend(0..self.input_arity);
723        demanded.extend(self.projection.iter());
724
725        // Determine which map expressions can be computed from the available subset.
726        // Some expressions may depend on other expressions, but by evaluating them
727        // in forward order we should accurately determine the available expressions.
728        let mut available_expr = vec![false; self.input_arity];
729        // Initialize available columns from `available`, which is then not used again.
730        for index in available.keys() {
731            available_expr[*index] = true;
732        }
733        for expr in self.expressions.into_iter() {
734            // We treat an expression as available if its supporting columns are available,
735            // and if it is not a literal (we want to avoid pushing down literals). This
736            // choice is ad-hoc, but the intent is that we partition the operators so
737            // that we can reduce the row representation size and total computation.
738            // Pushing down literals harms the former and does nothing for the latter.
739            // In the future, we'll want to have a harder think about this trade-off, as
740            // we are certainly making sub-optimal decisions by pushing down all available
741            // work.
742            // TODO(mcsherry): establish better principles about what work to push down.
743            let is_available =
744                expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
745            if is_available {
746                before_expr.push(expr);
747            } else {
748                demanded.extend(expr.support());
749                after_expr.push(expr);
750            }
751            available_expr.push(is_available);
752        }
753
754        // Determine which predicates can be computed from the available subset.
755        for (_when, pred) in self.predicates.into_iter() {
756            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
757            if is_available {
758                before_pred.push(pred);
759            } else {
760                demanded.extend(pred.support());
761                after_pred.push(pred);
762            }
763        }
764
765        // Map from prior output location to location in un-projected `before`.
766        // This map is used to correct references in `before` but it should be
767        // adjusted to reflect `before`s projection prior to use in `after`.
768        let mut before_map = available;
769        // Input columns include any additional undescribed columns that may
770        // not be captured by the `available` argument, so we must independently
771        // track the current number of columns (vs relying on `before_map.len()`).
772        let mut input_columns = input_arity;
773        for index in self.input_arity..available_expr.len() {
774            if available_expr[index] {
775                before_map.insert(index, input_columns);
776                input_columns += 1;
777            }
778        }
779
780        // Permute the column references in `before` expressions and predicates.
781        for expr in before_expr.iter_mut() {
782            expr.permute_map(&before_map);
783        }
784        for pred in before_pred.iter_mut() {
785            pred.permute_map(&before_map);
786        }
787
788        // Demand information determines `before`s output projection.
789        // Specifically, we produce all input columns in the output, as well as
790        // any columns that are available and demanded.
791        before_proj.extend(0..input_arity);
792        for index in self.input_arity..available_expr.len() {
793            // If an intermediate result is both available and demanded,
794            // we should produce it as output.
795            if available_expr[index] && demanded.contains(&index) {
796                // Use the new location of `index`.
797                before_proj.push(before_map[&index]);
798            }
799        }
800
801        // Map from prior output locations to location in post-`before` columns.
802        // This map is used to correct references in `after`.
803        // The presumption is that `after` will be presented with all input columns,
804        // followed by the output columns introduced by `before` in order.
805        let mut after_map = BTreeMap::new();
806        for index in 0..self.input_arity {
807            after_map.insert(index, index);
808        }
809        for index in self.input_arity..available_expr.len() {
810            // If an intermediate result is both available and demanded,
811            // it was produced as output.
812            if available_expr[index] && demanded.contains(&index) {
813                // We expect to find the output as far after `self.input_arity` as
814                // it was produced after `input_arity` in the output of `before`.
815                let location = self.input_arity
816                    + (before_proj
817                        .iter()
818                        .position(|x| x == &before_map[&index])
819                        .unwrap()
820                        - input_arity);
821                after_map.insert(index, location);
822            }
823        }
824        // We must now re-map the remaining non-demanded expressions, which are
825        // contiguous rather than potentially interspersed.
826        for index in self.input_arity..available_expr.len() {
827            if !available_expr[index] {
828                after_map.insert(index, after_map.len());
829            }
830        }
831
832        // Permute the column references in `after` expressions and predicates.
833        for expr in after_expr.iter_mut() {
834            expr.permute_map(&after_map);
835        }
836        for pred in after_pred.iter_mut() {
837            pred.permute_map(&after_map);
838        }
839        // Populate `after` projection with the new locations of `self.projection`.
840        for index in self.projection {
841            after_proj.push(after_map[&index]);
842        }
843
844        // Form and return the before and after MapFilterProject instances.
845        let before = Self::new(input_arity)
846            .map(before_expr)
847            .filter(before_pred)
848            .project(before_proj.clone());
849        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
850            .map(after_expr)
851            .filter(after_pred)
852            .project(after_proj);
853        (before, after)
854    }
855
856    /// Lists input columns whose values are used in outputs.
857    ///
858    /// It is entirely appropriate to determine the demand of an instance
859    /// and then both apply a projection to the subject of the instance and
860    /// `self.permute` this instance.
861    pub fn demand(&self) -> BTreeSet<usize> {
862        let mut demanded = BTreeSet::new();
863        for (_index, pred) in self.predicates.iter() {
864            demanded.extend(pred.support());
865        }
866        demanded.extend(self.projection.iter().cloned());
867        for index in (0..self.expressions.len()).rev() {
868            if demanded.contains(&(self.input_arity + index)) {
869                demanded.extend(self.expressions[index].support());
870            }
871        }
872        demanded.retain(|col| col < &self.input_arity);
873        demanded
874    }
875
876    /// Update input column references, due to an input projection or permutation.
877    ///
878    /// The `shuffle` argument remaps expected column identifiers to new locations,
879    /// with the expectation that `shuffle` describes all input columns, and so the
880    /// intermediate results will be able to start at position `shuffle.len()`.
881    ///
882    /// The supplied `shuffle` might not list columns that are not "demanded" by the
883    /// instance, and so we should ensure that `self` is optimized to not reference
884    /// columns that are not demanded.
885    pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
886    where
887        F: Fn(usize) -> usize,
888    {
889        let (mut map, mut filter, mut project) = self.as_map_filter_project();
890        let map_len = map.len();
891        let action = |col: &mut usize| {
892            if self.input_arity <= *col && *col < self.input_arity + map_len {
893                *col = new_input_arity + (*col - self.input_arity);
894            } else {
895                *col = remap(*col);
896            }
897        };
898        for expr in map.iter_mut() {
899            expr.visit_columns(action);
900        }
901        for pred in filter.iter_mut() {
902            pred.visit_columns(action);
903        }
904        for proj in project.iter_mut() {
905            action(proj);
906            assert!(*proj < new_input_arity + map.len());
907        }
908        *self = Self::new(new_input_arity)
909            .map(map)
910            .filter(filter)
911            .project(project)
912    }
913}
914
915// Optimization routines.
916impl MapFilterProject {
917    /// Optimize the internal expression evaluation order.
918    ///
919    /// This method performs several optimizations that are meant to streamline
920    /// the execution of the `MapFilterProject` instance, but not to alter its
921    /// semantics. This includes extracting expressions that are used multiple
922    /// times, inlining those that are not, and removing expressions that are
923    /// unreferenced.
924    ///
925    /// This method will inline all temporal expressions, and remove any columns
926    /// that are not demanded by the output, which should transform any temporal
927    /// filters to a state where the temporal expressions exist only in the list
928    /// of predicates.
929    ///
930    /// # Example
931    ///
932    /// This example demonstrates how the re-use of one expression, converting
933    /// column 1 from a string to an integer, can be extracted and the results
934    /// shared among the two uses. This example is used for each of the steps
935    /// along the optimization path.
936    ///
937    /// ```rust
938    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
939    /// // Demonstrate extraction of common expressions (here: parsing strings).
940    /// let mut map_filter_project = MapFilterProject::new(5)
941    ///     .map(vec![
942    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
943    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
944    ///     ])
945    ///     .project(vec![3,4,5,6]);
946    ///
947    /// let mut expected_optimized = MapFilterProject::new(5)
948    ///     .map(vec![
949    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
950    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
951    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
952    ///     ])
953    ///     .project(vec![3,4,6,7]);
954    ///
955    /// // Optimize the expression.
956    /// map_filter_project.optimize();
957    ///
958    /// assert_eq!(
959    ///     map_filter_project,
960    ///     expected_optimized,
961    /// );
962    /// ```
963    pub fn optimize(&mut self) {
964        // Track sizes and iterate as long as they decrease.
965        let mut prev_size = None;
966        let mut self_size = usize::max_value();
967        // Continue as long as strict improvements occur.
968        while prev_size.map(|p| self_size < p).unwrap_or(true) {
969            // Lock in current size.
970            prev_size = Some(self_size);
971
972            // We have an annoying pattern of mapping literals that already exist as columns (by filters).
973            // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
974            // and then replace the mapped expression by a column reference.
975            //
976            // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
977            // the first place. The tell-tale that we see when we fix is a diff that look likes
978            //
979            // - Project (#0, #2)
980            // -   Filter (#1 = 1)
981            // -     Map (1)
982            // -       Get l0
983            // + Filter (#1 = 1)
984            // +   Get l0
985            //
986            for (index, expr) in self.expressions.iter_mut().enumerate() {
987                // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
988                for (_, predicate) in self.predicates.iter() {
989                    if let MirScalarExpr::CallBinary {
990                        func: crate::BinaryFunc::Eq(_),
991                        expr1,
992                        expr2,
993                    } = predicate
994                    {
995                        if let MirScalarExpr::Column(c, name) = &**expr1 {
996                            if *c < index + self.input_arity && &**expr2 == expr {
997                                *expr = MirScalarExpr::Column(*c, name.clone());
998                            }
999                        }
1000                        if let MirScalarExpr::Column(c, name) = &**expr2 {
1001                            if *c < index + self.input_arity && &**expr1 == expr {
1002                                *expr = MirScalarExpr::Column(*c, name.clone());
1003                            }
1004                        }
1005                    }
1006                }
1007            }
1008
1009            // Optimization memoizes individual `ScalarExpr` expressions that
1010            // are sure to be evaluated, canonicalizes references to the first
1011            // occurrence of each, inlines expressions that have a reference
1012            // count of one, and then removes any expressions that are not
1013            // referenced.
1014            self.memoize_expressions();
1015            self.predicates.sort();
1016            self.predicates.dedup();
1017            self.inline_expressions();
1018            self.remove_undemanded();
1019
1020            // Re-build `self` from parts to restore evaluation order invariants.
1021            let (map, filter, project) = self.as_map_filter_project();
1022            *self = Self::new(self.input_arity)
1023                .map(map)
1024                .filter(filter)
1025                .project(project);
1026
1027            self_size = self.size();
1028        }
1029    }
1030
1031    /// Total expression sizes across all expressions.
1032    pub fn size(&self) -> usize {
1033        self.expressions.iter().map(|e| e.size()).sum::<usize>()
1034            + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1035    }
1036
1037    /// Place each certainly evaluated expression in its own column.
1038    ///
1039    /// This method places each non-trivial, certainly evaluated expression
1040    /// in its own column, and deduplicates them so that all references to
1041    /// the same expression reference the same column.
1042    ///
1043    /// This transformation is restricted to expressions we are certain will
1044    /// be evaluated, which does not include expressions in `if` statements.
1045    ///
1046    /// # Example
1047    ///
1048    /// This example demonstrates how memoization notices `MirScalarExpr`s
1049    /// that are used multiple times, and ensures that each are extracted
1050    /// into columns and then referenced by column. This pass does not try
1051    /// to minimize the occurrences of column references, which will happen
1052    /// in inlining.
1053    ///
1054    /// ```rust
1055    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1056    /// // Demonstrate extraction of common expressions (here: parsing strings).
1057    /// let mut map_filter_project = MapFilterProject::new(5)
1058    ///     .map(vec![
1059    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1060    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1061    ///     ])
1062    ///     .project(vec![3,4,5,6]);
1063    ///
1064    /// let mut expected_optimized = MapFilterProject::new(5)
1065    ///     .map(vec![
1066    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1067    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1068    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1069    ///         MirScalarExpr::column(7),
1070    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1071    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1072    ///         MirScalarExpr::column(10),
1073    ///     ])
1074    ///     .project(vec![3,4,8,11]);
1075    ///
1076    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1077    /// map_filter_project.memoize_expressions();
1078    ///
1079    /// assert_eq!(
1080    ///     map_filter_project,
1081    ///     expected_optimized,
1082    /// );
1083    /// ```
1084    ///
1085    /// Expressions may not be memoized if they are not certain to be evaluated,
1086    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1087    ///
1088    /// ```rust
1089    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1090    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1091    /// // the non-extraction of common expressions guarded by conditions.
1092    /// let mut map_filter_project = MapFilterProject::new(2)
1093    ///     .map(vec![
1094    ///         MirScalarExpr::If {
1095    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1096    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1097    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1098    ///         },
1099    ///         MirScalarExpr::If {
1100    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1101    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1102    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1103    ///         },
1104    ///     ]);
1105    ///
1106    /// let mut expected_optimized = MapFilterProject::new(2)
1107    ///     .map(vec![
1108    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1109    ///         MirScalarExpr::If {
1110    ///             cond: Box::new(MirScalarExpr::column(2)),
1111    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1112    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1113    ///         },
1114    ///         MirScalarExpr::column(3),
1115    ///         MirScalarExpr::If {
1116    ///             cond: Box::new(MirScalarExpr::column(2)),
1117    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1118    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1119    ///         },
1120    ///         MirScalarExpr::column(5),
1121    ///     ])
1122    ///     .project(vec![0,1,4,6]);
1123    ///
1124    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1125    /// map_filter_project.memoize_expressions();
1126    ///
1127    /// assert_eq!(
1128    ///     map_filter_project,
1129    ///     expected_optimized,
1130    /// );
1131    /// ```
1132    pub fn memoize_expressions(&mut self) {
1133        // Record the mapping from starting column references to new column
1134        // references.
1135        let mut remaps = BTreeMap::new();
1136        for index in 0..self.input_arity {
1137            remaps.insert(index, index);
1138        }
1139        let mut new_expressions = Vec::new();
1140
1141        // We follow the same order as for evaluation, to ensure that all
1142        // column references exist in time for their evaluation. We could
1143        // prioritize predicates, but we would need to be careful to chase
1144        // down column references to expressions and memoize those as well.
1145        let mut expression = 0;
1146        for (support, predicate) in self.predicates.iter_mut() {
1147            while self.input_arity + expression < *support {
1148                self.expressions[expression].permute_map(&remaps);
1149                memoize_expr(
1150                    &mut self.expressions[expression],
1151                    &mut new_expressions,
1152                    self.input_arity,
1153                );
1154                remaps.insert(
1155                    self.input_arity + expression,
1156                    self.input_arity + new_expressions.len(),
1157                );
1158                new_expressions.push(self.expressions[expression].clone());
1159                expression += 1;
1160            }
1161            predicate.permute_map(&remaps);
1162            memoize_expr(predicate, &mut new_expressions, self.input_arity);
1163        }
1164        while expression < self.expressions.len() {
1165            self.expressions[expression].permute_map(&remaps);
1166            memoize_expr(
1167                &mut self.expressions[expression],
1168                &mut new_expressions,
1169                self.input_arity,
1170            );
1171            remaps.insert(
1172                self.input_arity + expression,
1173                self.input_arity + new_expressions.len(),
1174            );
1175            new_expressions.push(self.expressions[expression].clone());
1176            expression += 1;
1177        }
1178
1179        self.expressions = new_expressions;
1180        for proj in self.projection.iter_mut() {
1181            *proj = remaps[proj];
1182        }
1183
1184        // Restore predicate order invariants.
1185        for (pos, pred) in self.predicates.iter_mut() {
1186            *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1187        }
1188    }
1189
1190    /// This method inlines expressions with a single use.
1191    ///
1192    /// This method only inlines expressions; it does not delete expressions
1193    /// that are no longer referenced. The `remove_undemanded()` method does
1194    /// that, and should likely be used after this method.
1195    ///
1196    /// Inlining replaces column references when the referred-to item is either
1197    /// another column reference, or the only referrer of its referent. This
1198    /// is most common after memoization has atomized all expressions to seek
1199    /// out re-use: inlining re-assembles expressions that were not helpfully
1200    /// shared with other expressions.
1201    ///
1202    /// # Example
1203    ///
1204    /// In this example, we see that with only a single reference to columns
1205    /// 0 and 2, their parsing can each be inlined. Similarly, column references
1206    /// can be cleaned up among expressions, and in the final projection.
1207    ///
1208    /// Also notice the remaining expressions, which can be cleaned up in a later
1209    /// pass (the `remove_undemanded` method).
1210    ///
1211    /// ```rust
1212    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1213    /// // Use the output from first `memoize_expression` example.
1214    /// let mut map_filter_project = MapFilterProject::new(5)
1215    ///     .map(vec![
1216    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1217    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1218    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1219    ///         MirScalarExpr::column(7),
1220    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1221    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1222    ///         MirScalarExpr::column(10),
1223    ///     ])
1224    ///     .project(vec![3,4,8,11]);
1225    ///
1226    /// let mut expected_optimized = MapFilterProject::new(5)
1227    ///     .map(vec![
1228    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1229    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1230    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1231    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1232    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1233    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1234    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1235    ///     ])
1236    ///     .project(vec![3,4,8,11]);
1237    ///
1238    /// // Inline expressions that are referenced only once.
1239    /// map_filter_project.inline_expressions();
1240    ///
1241    /// assert_eq!(
1242    ///     map_filter_project,
1243    ///     expected_optimized,
1244    /// );
1245    /// ```
1246    pub fn inline_expressions(&mut self) {
1247        // Local copy of input_arity to avoid borrowing `self` in closures.
1248        let input_arity = self.input_arity;
1249        // Reference counts track the number of places that a reference occurs.
1250        let mut reference_count = vec![0; input_arity + self.expressions.len()];
1251        // Increment reference counts for each use
1252        for expr in self.expressions.iter() {
1253            expr.visit_pre(|e| {
1254                if let MirScalarExpr::Column(i, _name) = e {
1255                    reference_count[*i] += 1;
1256                }
1257            });
1258        }
1259        for (_, pred) in self.predicates.iter() {
1260            pred.visit_pre(|e| {
1261                if let MirScalarExpr::Column(i, _name) = e {
1262                    reference_count[*i] += 1;
1263                }
1264            });
1265        }
1266        for proj in self.projection.iter() {
1267            reference_count[*proj] += 1;
1268        }
1269
1270        // Determine which expressions should be inlined because they reference temporal expressions.
1271        let mut is_temporal = vec![false; input_arity];
1272        for expr in self.expressions.iter() {
1273            // An express may contain a temporal expression, or reference a column containing such.
1274            is_temporal.push(
1275                expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1276            );
1277        }
1278
1279        // Inline only those columns that 1. are expressions not inputs, and
1280        // 2a. are column references or literals or 2b. have a refcount of 1,
1281        // or 2c. reference temporal expressions (which cannot be evaluated).
1282        let mut should_inline = vec![false; reference_count.len()];
1283        for i in (input_arity..reference_count.len()).rev() {
1284            if let MirScalarExpr::Column(c, _) = self.expressions[i - input_arity] {
1285                should_inline[i] = true;
1286                // The reference count of the referenced column should be
1287                // incremented with the number of references
1288                // `self.expressions[i - input_arity]` has.
1289                // Subtract 1 because `self.expressions[i - input_arity]` is
1290                // itself a reference.
1291                reference_count[c] += reference_count[i] - 1;
1292            } else {
1293                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1294            }
1295        }
1296        // Inline expressions per `should_inline`.
1297        self.perform_inlining(should_inline);
1298        // We can only inline column references in `self.projection`, but we should.
1299        for proj in self.projection.iter_mut() {
1300            if *proj >= self.input_arity {
1301                if let MirScalarExpr::Column(i, _) = self.expressions[*proj - self.input_arity] {
1302                    // TODO(mgree) !!! propagate name information to projection
1303                    *proj = i;
1304                }
1305            }
1306        }
1307    }
1308
1309    /// Inlines those expressions that are indicated by should_inline.
1310    /// See `inline_expressions` for usage.
1311    pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1312        for index in 0..self.expressions.len() {
1313            let (prior, expr) = self.expressions.split_at_mut(index);
1314            #[allow(deprecated)]
1315            expr[0].visit_mut_post_nolimit(&mut |e| {
1316                if let MirScalarExpr::Column(i, _name) = e {
1317                    if should_inline[*i] {
1318                        *e = prior[*i - self.input_arity].clone();
1319                    }
1320                }
1321            });
1322        }
1323        for (_index, pred) in self.predicates.iter_mut() {
1324            let expressions = &self.expressions;
1325            #[allow(deprecated)]
1326            pred.visit_mut_post_nolimit(&mut |e| {
1327                if let MirScalarExpr::Column(i, _name) = e {
1328                    if should_inline[*i] {
1329                        *e = expressions[*i - self.input_arity].clone();
1330                    }
1331                }
1332            });
1333        }
1334    }
1335
1336    /// Removes unused expressions from `self.expressions`.
1337    ///
1338    /// Expressions are "used" if they are relied upon by any output columns
1339    /// or any predicates, even transitively. Any expressions that are not
1340    /// relied upon in this way can be discarded.
1341    ///
1342    /// # Example
1343    ///
1344    /// ```rust
1345    /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1346    /// // Use the output from `inline_expression` example.
1347    /// let mut map_filter_project = MapFilterProject::new(5)
1348    ///     .map(vec![
1349    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1350    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1351    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1352    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1353    ///         MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1354    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1355    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1356    ///     ])
1357    ///     .project(vec![3,4,8,11]);
1358    ///
1359    /// let mut expected_optimized = MapFilterProject::new(5)
1360    ///     .map(vec![
1361    ///         MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1362    ///         MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1363    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1364    ///     ])
1365    ///     .project(vec![3,4,6,7]);
1366    ///
1367    /// // Remove undemanded expressions, streamlining the work done..
1368    /// map_filter_project.remove_undemanded();
1369    ///
1370    /// assert_eq!(
1371    ///     map_filter_project,
1372    ///     expected_optimized,
1373    /// );
1374    /// ```
1375    pub fn remove_undemanded(&mut self) {
1376        // Determine the demanded expressions to remove irrelevant ones.
1377        let mut demand = BTreeSet::new();
1378        for (_index, pred) in self.predicates.iter() {
1379            demand.extend(pred.support());
1380        }
1381        // Start from the output columns as presumed demanded.
1382        // If this is not the case, the caller should project some away.
1383        demand.extend(self.projection.iter().cloned());
1384        // Proceed in *reverse* order, as expressions may depend on other
1385        // expressions that precede them.
1386        for index in (0..self.expressions.len()).rev() {
1387            if demand.contains(&(self.input_arity + index)) {
1388                demand.extend(self.expressions[index].support());
1389            }
1390        }
1391
1392        // Maintain a map from initial column identifiers to locations
1393        // once we have removed undemanded expressions.
1394        let mut remap = BTreeMap::new();
1395        // This map only needs to map elements of `demand` to a new location,
1396        // but the logic is easier if we include all input columns (as the
1397        // new position is then determined by the size of the map).
1398        for index in 0..self.input_arity {
1399            remap.insert(index, index);
1400        }
1401        // Retain demanded expressions, and record their new locations.
1402        let mut new_expressions = Vec::new();
1403        for (index, expr) in self.expressions.drain(..).enumerate() {
1404            if demand.contains(&(index + self.input_arity)) {
1405                remap.insert(index + self.input_arity, remap.len());
1406                new_expressions.push(expr);
1407            }
1408        }
1409        self.expressions = new_expressions;
1410
1411        // Update column identifiers; rebuild `Self` to re-establish any invariants.
1412        // We mirror `self.permute(&remap)` but we specifically want to remap columns
1413        // that are produced by `self.expressions` after the input columns.
1414        let (expressions, predicates, projection) = self.as_map_filter_project();
1415        *self = Self::new(self.input_arity)
1416            .map(expressions.into_iter().map(|mut e| {
1417                e.permute_map(&remap);
1418                e
1419            }))
1420            .filter(predicates.into_iter().map(|mut p| {
1421                p.permute_map(&remap);
1422                p
1423            }))
1424            .project(projection.into_iter().map(|c| remap[&c]));
1425    }
1426}
1427
1428// TODO: move this elsewhere?
1429/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1430///
1431/// A part of `expr` that is memoized is replaced by a reference to column
1432/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1433/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1434/// refers to.
1435pub fn memoize_expr(
1436    expr: &mut MirScalarExpr,
1437    memoized_parts: &mut Vec<MirScalarExpr>,
1438    input_arity: usize,
1439) {
1440    #[allow(deprecated)]
1441    expr.visit_mut_pre_post_nolimit(
1442        &mut |e| {
1443            // We should not eagerly memoize `if` branches that might not be taken.
1444            // TODO: Memoize expressions in the intersection of `then` and `els`.
1445            if let MirScalarExpr::If { cond, .. } = e {
1446                return Some(vec![cond]);
1447            }
1448
1449            // We should not eagerly memoize `COALESCE` expressions after the first,
1450            // as they are only meant to be evaluated if the preceding expressions
1451            // evaluate to NULL. We could memoize any preceding by expressions that
1452            // are certain not to error.
1453            if let MirScalarExpr::CallVariadic {
1454                func: crate::VariadicFunc::Coalesce(_),
1455                exprs,
1456            } = e
1457            {
1458                return Some(exprs.iter_mut().take(1).collect());
1459            }
1460
1461            // We should not deconstruct temporal filters, because `MfpPlan::create_from` expects
1462            // those to be in a specific form. However, we _should_ attend to the expression that is
1463            // on the opposite side of mz_now(), because it might be a complex expression in itself,
1464            // and is ok to deconstruct.
1465            if let Some((_func, other_side)) = e.as_mut_temporal_filter().ok() {
1466                return Some(vec![other_side]);
1467            }
1468
1469            None
1470        },
1471        &mut |e| {
1472            match e {
1473                MirScalarExpr::Literal(_, _) => {
1474                    // Literals do not need to be memoized.
1475                }
1476                MirScalarExpr::Column(col, _) => {
1477                    // Column references do not need to be memoized, but may need to be
1478                    // updated if they reference a column reference themselves.
1479                    if *col > input_arity {
1480                        if let MirScalarExpr::Column(col2, _) = memoized_parts[*col - input_arity] {
1481                            // We do _not_ propagate column names, since mis-associating names and column
1482                            // references will be very confusing (and possibly bug-inducing).
1483                            *col = col2;
1484                        }
1485                    }
1486                }
1487                _ => {
1488                    // TODO: OOO (Optimizer Optimization Opportunity):
1489                    // we are quadratic in expression size because of this .iter().position
1490                    if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1491                        // Any complex expression that already exists as a prior column can
1492                        // be replaced by a reference to that column.
1493                        *e = MirScalarExpr::column(input_arity + position);
1494                    } else {
1495                        // A complex expression that does not exist should be memoized, and
1496                        // replaced by a reference to the column.
1497                        memoized_parts.push(std::mem::replace(
1498                            e,
1499                            MirScalarExpr::column(input_arity + memoized_parts.len()),
1500                        ));
1501                    }
1502                }
1503            }
1504        },
1505    )
1506}
1507
1508pub mod util {
1509    use std::collections::BTreeMap;
1510
1511    use crate::MirScalarExpr;
1512
1513    #[allow(dead_code)]
1514    /// A triple of actions that map from rows to (key, val) pairs and back again.
1515    struct KeyValRowMapping {
1516        /// Expressions to apply to a row to produce key datums.
1517        to_key: Vec<MirScalarExpr>,
1518        /// Columns to project from a row to produce residual value datums.
1519        to_val: Vec<usize>,
1520        /// Columns to project from the concatenation of key and value to reconstruct the row.
1521        to_row: Vec<usize>,
1522    }
1523
1524    /// Derive supporting logic to support transforming rows to (key, val) pairs,
1525    /// and back again.
1526    ///
1527    /// We are given as input a list of key expressions and an input arity, and the
1528    /// requirement the produced key should be the application of the key expressions.
1529    /// To produce the `val` output, we will identify those input columns not found in
1530    /// the key expressions, and name all other columns.
1531    /// To reconstitute the original row, we identify the sequence of columns from the
1532    /// concatenation of key and val which would reconstruct the original row.
1533    ///
1534    /// The output is a pair of column sequences, the first used to reconstruct a row
1535    /// from the concatenation of key and value, and the second to identify the columns
1536    /// of a row that should become the value associated with its key.
1537    ///
1538    /// The permutations and thinning expressions generated here will be tracked in
1539    /// `dataflow::plan::AvailableCollections`; see the
1540    /// documentation there for more details.
1541    pub fn permutation_for_arrangement(
1542        key: &[MirScalarExpr],
1543        unthinned_arity: usize,
1544    ) -> (Vec<usize>, Vec<usize>) {
1545        let columns_in_key: BTreeMap<_, _> = key
1546            .iter()
1547            .enumerate()
1548            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1549            .collect();
1550        let mut input_cursor = key.len();
1551        let permutation = (0..unthinned_arity)
1552            .map(|c| {
1553                if let Some(c) = columns_in_key.get(&c) {
1554                    // Column is in key (and thus gone from the value
1555                    // of the thinned representation)
1556                    *c
1557                } else {
1558                    // Column remains in value of the thinned representation
1559                    input_cursor += 1;
1560                    input_cursor - 1
1561                }
1562            })
1563            .collect();
1564        let thinning = (0..unthinned_arity)
1565            .filter(|c| !columns_in_key.contains_key(c))
1566            .collect();
1567        (permutation, thinning)
1568    }
1569
1570    /// Given the permutations (see [`permutation_for_arrangement`] and
1571    /// (`dataflow::plan::AvailableCollections`) corresponding to two
1572    /// collections with the same key arity,
1573    /// computes the permutation for the result of joining them.
1574    pub fn join_permutations(
1575        key_arity: usize,
1576        stream_permutation: Vec<usize>,
1577        thinned_stream_arity: usize,
1578        lookup_permutation: Vec<usize>,
1579    ) -> BTreeMap<usize, usize> {
1580        let stream_arity = stream_permutation.len();
1581        let lookup_arity = lookup_permutation.len();
1582
1583        (0..stream_arity + lookup_arity)
1584            .map(|i| {
1585                let location = if i < stream_arity {
1586                    stream_permutation[i]
1587                } else {
1588                    let location_in_lookup = lookup_permutation[i - stream_arity];
1589                    if location_in_lookup < key_arity {
1590                        location_in_lookup
1591                    } else {
1592                        location_in_lookup + thinned_stream_arity
1593                    }
1594                };
1595                (i, location)
1596            })
1597            .collect()
1598    }
1599}
1600
1601pub mod plan {
1602    use std::iter;
1603
1604    use mz_repr::{Datum, Diff, Row, RowArena};
1605    use serde::{Deserialize, Serialize};
1606
1607    use crate::{BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, UnaryFunc, func};
1608
1609    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1610    #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1611    pub struct SafeMfpPlan {
1612        pub(crate) mfp: MapFilterProject,
1613    }
1614
1615    impl SafeMfpPlan {
1616        /// Remaps references to input columns according to `remap`.
1617        ///
1618        /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1619        pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1620        where
1621            F: Fn(usize) -> usize,
1622        {
1623            self.mfp.permute_fn(remap, new_arity);
1624        }
1625        /// Evaluates the linear operator on a supplied list of datums.
1626        ///
1627        /// The arguments are the initial datums associated with the row,
1628        /// and an appropriately lifetimed arena for temporary allocations
1629        /// needed by scalar evaluation.
1630        ///
1631        /// An `Ok` result will either be `None` if any predicate did not
1632        /// evaluate to `Datum::True`, or the values of the columns listed
1633        /// by `self.projection` if all predicates passed. If an error
1634        /// occurs in the evaluation it is returned as an `Err` variant.
1635        /// As the evaluation exits early with failed predicates, it may
1636        /// miss some errors that would occur later in evaluation.
1637        ///
1638        /// The `row` is not cleared first, but emptied if the function
1639        /// returns `Ok(Some(row)).
1640        #[inline(always)]
1641        pub fn evaluate_into<'a, 'row>(
1642            &'a self,
1643            datums: &mut Vec<Datum<'a>>,
1644            arena: &'a RowArena,
1645            row_buf: &'row mut Row,
1646        ) -> Result<Option<&'row Row>, EvalError> {
1647            let passed_predicates = self.evaluate_inner(datums, arena)?;
1648            if !passed_predicates {
1649                Ok(None)
1650            } else {
1651                row_buf
1652                    .packer()
1653                    .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1654                Ok(Some(row_buf))
1655            }
1656        }
1657
1658        /// A version of `evaluate` which produces an iterator over `Datum`
1659        /// as output.
1660        ///
1661        /// This version can be useful when one wants to capture the resulting
1662        /// datums without packing and then unpacking a row.
1663        #[inline(always)]
1664        pub fn evaluate_iter<'b, 'a: 'b>(
1665            &'a self,
1666            datums: &'b mut Vec<Datum<'a>>,
1667            arena: &'a RowArena,
1668        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1669            let passed_predicates = self.evaluate_inner(datums, arena)?;
1670            if !passed_predicates {
1671                Ok(None)
1672            } else {
1673                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1674            }
1675        }
1676
1677        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1678        ///
1679        /// This does not apply `self.projection`, which is up to the calling method.
1680        pub fn evaluate_inner<'b, 'a: 'b>(
1681            &'a self,
1682            datums: &'b mut Vec<Datum<'a>>,
1683            arena: &'a RowArena,
1684        ) -> Result<bool, EvalError> {
1685            let mut expression = 0;
1686            for (support, predicate) in self.mfp.predicates.iter() {
1687                while self.mfp.input_arity + expression < *support {
1688                    datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1689                    expression += 1;
1690                }
1691                if predicate.eval(&datums[..], arena)? != Datum::True {
1692                    return Ok(false);
1693                }
1694            }
1695            while expression < self.mfp.expressions.len() {
1696                datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1697                expression += 1;
1698            }
1699            Ok(true)
1700        }
1701
1702        /// Returns true if evaluation could introduce an error on non-error inputs.
1703        pub fn could_error(&self) -> bool {
1704            self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1705                || self.mfp.expressions.iter().any(|e| e.could_error())
1706        }
1707
1708        /// Returns true when `Self` is the identity.
1709        pub fn is_identity(&self) -> bool {
1710            self.mfp.is_identity()
1711        }
1712    }
1713
1714    impl std::ops::Deref for SafeMfpPlan {
1715        type Target = MapFilterProject;
1716        fn deref(&self) -> &Self::Target {
1717            &self.mfp
1718        }
1719    }
1720
1721    /// Predicates partitioned into temporal and non-temporal.
1722    ///
1723    /// Temporal predicates require some recognition to determine their
1724    /// structure, and it is best to do that once and re-use the results.
1725    ///
1726    /// There are restrictions on the temporal predicates we currently support.
1727    /// They must directly constrain `MzNow` from below or above,
1728    /// by expressions that do not themselves contain `MzNow`.
1729    /// Conjunctions of such constraints are also ok.
1730    #[derive(Clone, Debug, PartialEq)]
1731    pub struct MfpPlan {
1732        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1733        pub(crate) mfp: SafeMfpPlan,
1734        /// Expressions that when evaluated lower-bound `MzNow`.
1735        pub(crate) lower_bounds: Vec<MirScalarExpr>,
1736        /// Expressions that when evaluated upper-bound `MzNow`.
1737        pub(crate) upper_bounds: Vec<MirScalarExpr>,
1738    }
1739
1740    impl MfpPlan {
1741        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1742        ///
1743        /// The first returned list is of predicates that do not contain `mz_now`.
1744        /// The second and third returned lists contain expressions that, once evaluated, lower
1745        /// and upper bound the validity interval of a record, respectively. These second two
1746        /// lists are populated only by binary expressions of the form
1747        /// ```ignore
1748        /// mz_now cmp_op expr
1749        /// ```
1750        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1751        ///
1752        /// If any unsupported expression is found, for example one that uses `mz_now`
1753        /// in an unsupported position, an error is returned.
1754        pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1755            let mut lower_bounds = Vec::new();
1756            let mut upper_bounds = Vec::new();
1757
1758            let mut temporal = Vec::new();
1759
1760            // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1761            mfp.optimize();
1762
1763            mfp.predicates.retain(|(_position, predicate)| {
1764                if predicate.contains_temporal() {
1765                    temporal.push(predicate.clone());
1766                    false
1767                } else {
1768                    true
1769                }
1770            });
1771
1772            for mut predicate in temporal.into_iter() {
1773                let (func, expr2) = predicate.as_mut_temporal_filter()?;
1774                let expr2 = expr2.clone();
1775
1776                // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1777                match func {
1778                    BinaryFunc::Eq(_) => {
1779                        lower_bounds.push(expr2.clone());
1780                        upper_bounds.push(
1781                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1782                        );
1783                    }
1784                    BinaryFunc::Lt(_) => {
1785                        upper_bounds.push(expr2.clone());
1786                    }
1787                    BinaryFunc::Lte(_) => {
1788                        upper_bounds.push(
1789                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1790                        );
1791                    }
1792                    BinaryFunc::Gt(_) => {
1793                        lower_bounds.push(
1794                            expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1795                        );
1796                    }
1797                    BinaryFunc::Gte(_) => {
1798                        lower_bounds.push(expr2.clone());
1799                    }
1800                    _ => {
1801                        return Err(format!("Unsupported binary temporal operation: {:?}", func));
1802                    }
1803                }
1804            }
1805
1806            Ok(Self {
1807                mfp: SafeMfpPlan { mfp },
1808                lower_bounds,
1809                upper_bounds,
1810            })
1811        }
1812
1813        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1814        pub fn is_identity(&self) -> bool {
1815            self.mfp.mfp.is_identity()
1816                && self.lower_bounds.is_empty()
1817                && self.upper_bounds.is_empty()
1818        }
1819
1820        /// Returns `self`, and leaves behind an identity operator that acts on its output.
1821        pub fn take(&mut self) -> Self {
1822            let mut identity = Self {
1823                mfp: SafeMfpPlan {
1824                    mfp: MapFilterProject::new(self.mfp.projection.len()),
1825                },
1826                lower_bounds: Default::default(),
1827                upper_bounds: Default::default(),
1828            };
1829            std::mem::swap(self, &mut identity);
1830            identity
1831        }
1832
1833        /// Attempt to convert self into a non-temporal MapFilterProject plan.
1834        ///
1835        /// If that is not possible, the original instance is returned as an error.
1836        #[allow(clippy::result_large_err)]
1837        pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1838            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1839                Ok(self.mfp)
1840            } else {
1841                Err(self)
1842            }
1843        }
1844
1845        /// Returns an iterator over mutable references to all non-temporal
1846        /// scalar expressions in the plan.
1847        ///
1848        /// The order of iteration is unspecified.
1849        pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1850            iter::empty()
1851                .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1852                .chain(&mut self.mfp.mfp.expressions)
1853                .chain(&mut self.lower_bounds)
1854                .chain(&mut self.upper_bounds)
1855        }
1856
1857        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1858        ///
1859        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1860        /// or an evaluation error. If `self contains temporal predicates, the results can be times
1861        /// that are greater than the input `time`, and may contain negated `diff` values.
1862        ///
1863        /// The `row_builder` is not cleared first, but emptied if the function
1864        /// returns an iterator with any `Ok(_)` element.
1865        pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1866            &'a self,
1867            datums: &'b mut Vec<Datum<'a>>,
1868            arena: &'a RowArena,
1869            time: mz_repr::Timestamp,
1870            diff: Diff,
1871            valid_time: V,
1872            row_builder: &mut Row,
1873        ) -> impl Iterator<
1874            Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1875        > + use<E, V> {
1876            match self.mfp.evaluate_inner(datums, arena) {
1877                Err(e) => {
1878                    return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1879                }
1880                Ok(true) => {}
1881                Ok(false) => {
1882                    return None.into_iter().chain(None);
1883                }
1884            }
1885
1886            // Lower and upper bounds.
1887            let mut lower_bound = time;
1888            let mut upper_bound = None;
1889
1890            // Track whether we have seen a null in either bound, as this should
1891            // prevent the record from being produced at any time.
1892            let mut null_eval = false;
1893
1894            // Advance our lower bound to be at least the result of any lower bound
1895            // expressions.
1896            for l in self.lower_bounds.iter() {
1897                match l.eval(datums, arena) {
1898                    Err(e) => {
1899                        return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1900                    }
1901                    Ok(Datum::MzTimestamp(d)) => {
1902                        lower_bound = lower_bound.max(d);
1903                    }
1904                    Ok(Datum::Null) => {
1905                        null_eval = true;
1906                    }
1907                    x => {
1908                        panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1909                    }
1910                }
1911            }
1912
1913            // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1914            if !valid_time(&lower_bound) {
1915                return None.into_iter().chain(None);
1916            }
1917
1918            // If there are any upper bounds, determine the minimum upper bound.
1919            for u in self.upper_bounds.iter() {
1920                // We can cease as soon as the lower and upper bounds match,
1921                // as the update will certainly not be produced in that case.
1922                if upper_bound != Some(lower_bound) {
1923                    match u.eval(datums, arena) {
1924                        Err(e) => {
1925                            return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1926                        }
1927                        Ok(Datum::MzTimestamp(d)) => {
1928                            if let Some(upper) = upper_bound {
1929                                upper_bound = Some(upper.min(d));
1930                            } else {
1931                                upper_bound = Some(d);
1932                            };
1933                            // Force the upper bound to be at least the lower
1934                            // bound. The `is_some()` test should always be true
1935                            // due to the above block, but maintain it here in
1936                            // case that changes. It's hopefully optimized away.
1937                            if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1938                                upper_bound = Some(lower_bound);
1939                            }
1940                        }
1941                        Ok(Datum::Null) => {
1942                            null_eval = true;
1943                        }
1944                        x => {
1945                            panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1946                        }
1947                    }
1948                }
1949            }
1950
1951            // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1952            if let Some(upper) = &mut upper_bound {
1953                if !valid_time(upper) {
1954                    upper_bound = None;
1955                }
1956            }
1957
1958            // Produce an output only if the upper bound exceeds the lower bound,
1959            // and if we did not encounter a `null` in our evaluation.
1960            if Some(lower_bound) != upper_bound && !null_eval {
1961                row_builder
1962                    .packer()
1963                    .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1964                let upper_opt =
1965                    upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1966                let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1967                lower.into_iter().chain(upper_opt)
1968            } else {
1969                None.into_iter().chain(None)
1970            }
1971        }
1972
1973        /// Returns true if evaluation could introduce an error on non-error inputs.
1974        pub fn could_error(&self) -> bool {
1975            self.mfp.could_error()
1976                || self.lower_bounds.iter().any(|e| e.could_error())
1977                || self.upper_bounds.iter().any(|e| e.could_error())
1978        }
1979
1980        /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1981        ///
1982        /// At the moment, this is only true if it projects away all columns and applies no filters,
1983        /// but it could be extended to plans that produce literals independent of the input.
1984        pub fn ignores_input(&self) -> bool {
1985            self.lower_bounds.is_empty()
1986                && self.upper_bounds.is_empty()
1987                && self.mfp.mfp.projection.is_empty()
1988                && self.mfp.mfp.predicates.is_empty()
1989        }
1990    }
1991}