mz_expr/linear.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::visit::Visit;
16use crate::{MirRelationExpr, MirScalarExpr};
17
18/// A compound operator that can be applied row-by-row.
19///
20/// This operator integrates the map, filter, and project operators.
21/// It applies a sequences of map expressions, which are allowed to
22/// refer to previous expressions, interleaved with predicates which
23/// must be satisfied for an output to be produced. If all predicates
24/// evaluate to `Datum::True` the data at the identified columns are
25/// collected and produced as output in a packed `Row`.
26///
27/// This operator is a "builder" and its contents may contain expressions
28/// that are not yet executable. For example, it may contain temporal
29/// expressions in `self.expressions`, even though this is not something
30/// we can directly evaluate. The plan creation methods will defensively
31/// ensure that the right thing happens.
32#[derive(
33 Clone,
34 Debug,
35 Eq,
36 PartialEq,
37 Serialize,
38 Deserialize,
39 Hash,
40 Ord,
41 PartialOrd
42)]
43pub struct MapFilterProject {
44 /// A sequence of expressions that should be appended to the row.
45 ///
46 /// Many of these expressions may not be produced in the output,
47 /// and may only be present as common subexpressions.
48 pub expressions: Vec<MirScalarExpr>,
49 /// Expressions that must evaluate to `Datum::True` for the output
50 /// row to be produced.
51 ///
52 /// Each entry is prepended with a column identifier indicating
53 /// the column *before* which the predicate should first be applied.
54 /// Most commonly this would be one plus the largest column identifier
55 /// in the predicate's support, but it could be larger to implement
56 /// guarded evaluation of predicates.
57 ///
58 /// This list should be sorted by the first field.
59 pub predicates: Vec<(usize, MirScalarExpr)>,
60 /// A sequence of column identifiers whose data form the output row.
61 pub projection: Vec<usize>,
62 /// The expected number of input columns.
63 ///
64 /// This is needed to ensure correct identification of newly formed
65 /// columns in the output.
66 pub input_arity: usize,
67}
68
69impl Display for MapFilterProject {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 writeln!(f, "MapFilterProject(")?;
72 writeln!(f, " expressions:")?;
73 self.expressions
74 .iter()
75 .enumerate()
76 .try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
77 writeln!(f, " predicates:")?;
78 self.predicates
79 .iter()
80 .try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
81 writeln!(f, " projection: {:?}", self.projection)?;
82 writeln!(f, " input_arity: {}", self.input_arity)?;
83 writeln!(f, ")")
84 }
85}
86
87impl MapFilterProject {
88 /// Create a no-op operator for an input of a supplied arity.
89 pub fn new(input_arity: usize) -> Self {
90 Self {
91 expressions: Vec::new(),
92 predicates: Vec::new(),
93 projection: (0..input_arity).collect(),
94 input_arity,
95 }
96 }
97
98 /// Given two mfps, return an mfp that applies one
99 /// followed by the other.
100 /// Note that the arguments are in the opposite order
101 /// from how function composition is usually written in mathematics.
102 pub fn compose(before: Self, after: Self) -> Self {
103 let (m, f, p) = after.into_map_filter_project();
104 before.map(m).filter(f).project(p)
105 }
106
107 /// True if the operator describes the identity transformation.
108 pub fn is_identity(&self) -> bool {
109 self.expressions.is_empty()
110 && self.predicates.is_empty()
111 && self.projection.len() == self.input_arity
112 && self.projection.iter().enumerate().all(|(i, p)| i == *p)
113 }
114
115 /// Retain only the indicated columns in the presented order.
116 pub fn project<I>(mut self, columns: I) -> Self
117 where
118 I: IntoIterator<Item = usize> + std::fmt::Debug,
119 {
120 self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
121 self
122 }
123
124 /// Retain only rows satisfying these predicates.
125 ///
126 /// This method introduces predicates as eagerly as they can be evaluated,
127 /// which may not be desired for predicates that may cause exceptions.
128 /// If fine manipulation is required, the predicates can be added manually.
129 pub fn filter<I>(mut self, predicates: I) -> Self
130 where
131 I: IntoIterator<Item = MirScalarExpr>,
132 {
133 for mut predicate in predicates {
134 // Correct column references.
135 predicate.permute(&self.projection[..]);
136
137 // Validate column references.
138 assert!(
139 predicate
140 .support()
141 .into_iter()
142 .all(|c| c < self.input_arity + self.expressions.len())
143 );
144
145 // Insert predicate as eagerly as it can be evaluated:
146 // just after the largest column in its support is formed.
147 let max_support = predicate
148 .support()
149 .into_iter()
150 .max()
151 .map(|c| c + 1)
152 .unwrap_or(0);
153 self.predicates.push((max_support, predicate))
154 }
155 // Stable sort predicates by position at which they take effect.
156 // We put literal errors at the end as a stop-gap to avoid erroring
157 // before we are able to evaluate any predicates that might prevent it.
158 self.predicates
159 .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
160 self
161 }
162
163 /// Append the result of evaluating expressions to each row.
164 pub fn map<I>(mut self, expressions: I) -> Self
165 where
166 I: IntoIterator<Item = MirScalarExpr>,
167 {
168 for mut expression in expressions {
169 // Correct column references.
170 expression.permute(&self.projection[..]);
171
172 // Validate column references.
173 assert!(
174 expression
175 .support()
176 .into_iter()
177 .all(|c| c < self.input_arity + self.expressions.len())
178 );
179
180 // Introduce expression and produce as output.
181 self.expressions.push(expression);
182 self.projection
183 .push(self.input_arity + self.expressions.len() - 1);
184 }
185
186 self
187 }
188
189 /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
190 pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
191 let predicates = self
192 .predicates
193 .into_iter()
194 .map(|(_pos, predicate)| predicate)
195 .collect();
196 (self.expressions, predicates, self.projection)
197 }
198
199 /// As the arguments to `Map`, `Filter`, and `Project` operators.
200 ///
201 /// In principle, this operator can be implemented as a sequence of
202 /// more elemental operators, likely less efficiently.
203 pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
204 self.clone().into_map_filter_project()
205 }
206
207 /// Determines if a scalar expression must be equal to a literal datum.
208 pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
209 for (_pos, predicate) in self.predicates.iter() {
210 if let MirScalarExpr::CallBinary {
211 func: crate::BinaryFunc::Eq(_),
212 expr1,
213 expr2,
214 } = predicate
215 {
216 if let Some(Ok(datum1)) = expr1.as_literal() {
217 if &**expr2 == expr {
218 return Some(datum1);
219 }
220 }
221 if let Some(Ok(datum2)) = expr2.as_literal() {
222 if &**expr1 == expr {
223 return Some(datum2);
224 }
225 }
226 }
227 }
228 None
229 }
230
231 /// Determines if a sequence of scalar expressions must be equal to a literal row.
232 ///
233 /// This method returns `None` on an empty `exprs`, which might be surprising, but
234 /// seems to line up with its callers' expectations of that being a non-constraint.
235 /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
236 /// if they would rather have a literal empty row.
237 pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
238 if exprs.is_empty() {
239 return None;
240 }
241 let mut row = Row::default();
242 let mut packer = row.packer();
243 for expr in exprs {
244 if let Some(literal) = self.literal_constraint(expr) {
245 packer.push(literal);
246 } else {
247 return None;
248 }
249 }
250 Some(row)
251 }
252
253 /// Extracts any MapFilterProject at the root of the expression.
254 ///
255 /// The expression will be modified to extract any maps, filters, and
256 /// projections, which will be returned as `Self`. If there are no maps,
257 /// filters, or projections the method will return an identity operator.
258 ///
259 /// The extracted expressions may contain temporal predicates, and one
260 /// should be careful to apply them blindly.
261 pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
262 // TODO: This could become iterative rather than recursive if
263 // we were able to fuse MFP operators from below, rather than
264 // from above.
265 match expr {
266 MirRelationExpr::Map { input, scalars } => {
267 let (mfp, expr) = Self::extract_from_expression(input);
268 (mfp.map(scalars.iter().cloned()), expr)
269 }
270 MirRelationExpr::Filter { input, predicates } => {
271 let (mfp, expr) = Self::extract_from_expression(input);
272 (mfp.filter(predicates.iter().cloned()), expr)
273 }
274 MirRelationExpr::Project { input, outputs } => {
275 let (mfp, expr) = Self::extract_from_expression(input);
276 (mfp.project(outputs.iter().cloned()), expr)
277 }
278 // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
279 // this call to `arity()`.
280 x => (Self::new(x.arity()), x),
281 }
282 }
283
284 /// Extracts an error-free MapFilterProject at the root of the expression.
285 ///
286 /// The expression will be modified to extract maps, filters, and projects
287 /// from the root of the expression, which will be returned as `Self`. The
288 /// extraction will halt if a Map or Filter containing a literal error is
289 /// reached. Otherwise, the method will return an identity operator.
290 ///
291 /// This method is meant to be used during optimization, where it is
292 /// necessary to avoid moving around maps and filters with errors.
293 pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
294 match expr {
295 MirRelationExpr::Map { input, scalars }
296 if scalars.iter().all(|s| !s.is_literal_err()) =>
297 {
298 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
299 (mfp.map(scalars.iter().cloned()), expr)
300 }
301 MirRelationExpr::Filter { input, predicates }
302 if predicates.iter().all(|p| !p.is_literal_err()) =>
303 {
304 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
305 (mfp.filter(predicates.iter().cloned()), expr)
306 }
307 MirRelationExpr::Project { input, outputs } => {
308 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
309 (mfp.project(outputs.iter().cloned()), expr)
310 }
311 x => (Self::new(x.arity()), x),
312 }
313 }
314
315 /// Extracts an error-free MapFilterProject at the root of the expression.
316 ///
317 /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
318 /// mutable reference.
319 pub fn extract_non_errors_from_expr_ref_mut(
320 expr: &mut MirRelationExpr,
321 ) -> (Self, &mut MirRelationExpr) {
322 // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
323 // superfluous outer if, which works around a borrow-checker issue:
324 // https://github.com/rust-lang/rust/issues/54663
325 if matches!(
326 expr,
327 MirRelationExpr::Map { input: _, scalars }
328 if scalars.iter().all(|s| !s.is_literal_err())
329 ) || matches!(
330 expr,
331 MirRelationExpr::Filter { input: _, predicates }
332 if predicates.iter().all(|p| !p.is_literal_err())
333 ) || matches!(expr, MirRelationExpr::Project { .. })
334 {
335 match expr {
336 MirRelationExpr::Map { input, scalars }
337 if scalars.iter().all(|s| !s.is_literal_err()) =>
338 {
339 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
340 (mfp.map(scalars.iter().cloned()), expr)
341 }
342 MirRelationExpr::Filter { input, predicates }
343 if predicates.iter().all(|p| !p.is_literal_err()) =>
344 {
345 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
346 (mfp.filter(predicates.iter().cloned()), expr)
347 }
348 MirRelationExpr::Project { input, outputs } => {
349 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
350 (mfp.project(outputs.iter().cloned()), expr)
351 }
352 _ => unreachable!(),
353 }
354 } else {
355 (Self::new(expr.arity()), expr)
356 }
357 }
358
359 /// Removes an error-free MapFilterProject from the root of the expression.
360 ///
361 /// The expression will be modified to extract maps, filters, and projects
362 /// from the root of the expression, which will be returned as `Self`. The
363 /// extraction will halt if a Map or Filter containing a literal error is
364 /// reached. Otherwise, the method will return an
365 /// identity operator, and the expression will remain unchanged.
366 ///
367 /// This method is meant to be used during optimization, where it is
368 /// necessary to avoid moving around maps and filters with errors.
369 pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
370 match expr {
371 MirRelationExpr::Map { input, scalars }
372 if scalars.iter().all(|s| !s.is_literal_err()) =>
373 {
374 let mfp =
375 Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
376 *expr = input.take_dangerous();
377 mfp
378 }
379 MirRelationExpr::Filter { input, predicates }
380 if predicates.iter().all(|p| !p.is_literal_err()) =>
381 {
382 let mfp = Self::extract_non_errors_from_expr_mut(input)
383 .filter(predicates.iter().cloned());
384 *expr = input.take_dangerous();
385 mfp
386 }
387 MirRelationExpr::Project { input, outputs } => {
388 let mfp =
389 Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
390 *expr = input.take_dangerous();
391 mfp
392 }
393 x => Self::new(x.arity()),
394 }
395 }
396
397 /// Returns `true` if any predicate in this MFP contains a temporal expression (`mz_now()`).
398 pub fn has_temporal_predicates(&self) -> bool {
399 self.predicates
400 .iter()
401 .any(|(_, predicate)| predicate.contains_temporal())
402 }
403
404 /// Extracts temporal predicates into their own `Self`.
405 ///
406 /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
407 /// though there could be justification for extracting them as well if they are otherwise
408 /// unused.
409 ///
410 /// This separation is valuable when the execution cannot be fused into one operator.
411 pub fn extract_temporal(&mut self) -> Self {
412 // Optimize the expression, as it is only post-optimization that we can be certain
413 // that temporal expressions are restricted to filters. We could relax this in the
414 // future to be only `inline_expressions` and `remove_undemanded`, but optimization
415 // seems to be the best fit at the moment.
416 self.optimize();
417
418 // Assert that we no longer have temporal expressions to evaluate. This should only
419 // occur if the optimization above results with temporal expressions yielded in the
420 // output, which is out of spec for how the type is meant to be used.
421 assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
422
423 // Extract temporal predicates from `self.predicates`.
424 let mut temporal_predicates = Vec::new();
425 self.predicates.retain(|(_position, predicate)| {
426 if predicate.contains_temporal() {
427 temporal_predicates.push(predicate.clone());
428 false
429 } else {
430 true
431 }
432 });
433
434 // Determine extended input columns used by temporal filters.
435 let mut support = BTreeSet::new();
436 for predicate in temporal_predicates.iter() {
437 support.extend(predicate.support());
438 }
439
440 // Discover the locations of these columns after `self.projection`.
441 let old_projection_len = self.projection.len();
442 let mut new_location = BTreeMap::new();
443 for original in support.iter() {
444 if let Some(position) = self.projection.iter().position(|x| x == original) {
445 new_location.insert(*original, position);
446 } else {
447 new_location.insert(*original, self.projection.len());
448 self.projection.push(*original);
449 }
450 }
451 // Permute references in extracted predicates to their new locations.
452 for predicate in temporal_predicates.iter_mut() {
453 predicate.permute_map(&new_location);
454 }
455
456 // Form a new `Self` containing the temporal predicates to return.
457 Self::new(self.projection.len())
458 .filter(temporal_predicates)
459 .project(0..old_projection_len)
460 }
461
462 /// Extracts common expressions from multiple `Self` into a result `Self`.
463 ///
464 /// The argument `mfps` are mutated so that each are functionaly equivalent to their
465 /// corresponding input, when composed atop the resulting `Self`.
466 ///
467 /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
468 pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
469 match mfps.len() {
470 0 => {
471 panic!("Cannot call method on empty arguments");
472 }
473 1 => {
474 let output_arity = mfps[0].projection.len();
475 std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
476 }
477 _ => {
478 // More generally, we convert each mfp to ANF, at which point we can
479 // repeatedly extract atomic expressions that depend only on input
480 // columns, migrate them to an input mfp, and repeat until no such
481 // expressions exist. At this point, we can also migrate predicates
482 // and then determine and push down projections.
483
484 // Prepare a return `Self`.
485 let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
486
487 // We convert each mfp to ANF, using `memoize_expressions`.
488 for mfp in mfps.iter_mut() {
489 mfp.memoize_expressions();
490 }
491
492 // We repeatedly extract common expressions, until none remain.
493 let mut done = false;
494 while !done {
495 // We use references to determine common expressions, and must
496 // introduce a scope here to drop the borrows before mutation.
497 let common = {
498 // The input arity may increase as we iterate, so recapture.
499 let input_arity = result_mfp.projection.len();
500 let mut prev: BTreeSet<_> = mfps[0]
501 .expressions
502 .iter()
503 .filter(|e| e.support().iter().max() < Some(&input_arity))
504 .collect();
505 let mut next = BTreeSet::default();
506 for mfp in mfps[1..].iter() {
507 for expr in mfp.expressions.iter() {
508 if prev.contains(expr) {
509 next.insert(expr);
510 }
511 }
512 std::mem::swap(&mut prev, &mut next);
513 next.clear();
514 }
515 prev.into_iter().cloned().collect::<Vec<_>>()
516 };
517 // Without new common expressions, we should terminate the loop.
518 done = common.is_empty();
519
520 // Migrate each expression in `common` to `result_mfp`.
521 for expr in common.into_iter() {
522 // Update each mfp by removing expr and updating column references.
523 for mfp in mfps.iter_mut() {
524 // With `expr` next in `result_mfp`, it is as if we are rotating it to
525 // be the first expression in `mfp`, and then removing it from `mfp` and
526 // increasing the input arity of `mfp`.
527 let arity = result_mfp.projection.len();
528 let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
529 let index = arity + found;
530 // Column references change due to the rotation from `index` to `arity`.
531 let action = |c: &mut usize| {
532 if arity <= *c && *c < index {
533 *c += 1;
534 } else if *c == index {
535 *c = arity;
536 }
537 };
538 // Rotate `expr` from `found` to first, and then snip.
539 // Short circuit by simply removing and incrementing the input arity.
540 mfp.input_arity += 1;
541 mfp.expressions.remove(found);
542 // Update column references in expressions, predicates, and projections.
543 for e in mfp.expressions.iter_mut() {
544 e.visit_columns(action);
545 }
546 for (o, e) in mfp.predicates.iter_mut() {
547 e.visit_columns(action);
548 // Max out the offset for the predicate; optimization will correct.
549 *o = mfp.input_arity + mfp.expressions.len();
550 }
551 for c in mfp.projection.iter_mut() {
552 action(c);
553 }
554 }
555 // Install the expression and update
556 result_mfp.expressions.push(expr);
557 result_mfp.projection.push(result_mfp.projection.len());
558 }
559 }
560 // As before, but easier: predicates in common to all mfps.
561 let common_preds: Vec<MirScalarExpr> = {
562 let input_arity = result_mfp.projection.len();
563 let mut prev: BTreeSet<_> = mfps[0]
564 .predicates
565 .iter()
566 .map(|(_, e)| e)
567 .filter(|e| e.support().iter().max() < Some(&input_arity))
568 .collect();
569 let mut next = BTreeSet::default();
570 for mfp in mfps[1..].iter() {
571 for (_, expr) in mfp.predicates.iter() {
572 if prev.contains(expr) {
573 next.insert(expr);
574 }
575 }
576 std::mem::swap(&mut prev, &mut next);
577 next.clear();
578 }
579 // Expressions in common, that we will append to `result_mfp.expressions`.
580 prev.into_iter().cloned().collect::<Vec<_>>()
581 };
582 for mfp in mfps.iter_mut() {
583 mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
584 mfp.optimize();
585 }
586 result_mfp.predicates.extend(
587 common_preds
588 .into_iter()
589 .map(|e| (result_mfp.projection.len(), e)),
590 );
591
592 // Then, look for unused columns and project them away.
593 let mut common_demand = BTreeSet::new();
594 for mfp in mfps.iter() {
595 common_demand.extend(mfp.demand());
596 }
597 // columns in `common_demand` must be retained, but others
598 // may be discarded.
599 let common_demand = (0..result_mfp.projection.len())
600 .filter(|x| common_demand.contains(x))
601 .collect::<Vec<_>>();
602 let remap = common_demand
603 .iter()
604 .cloned()
605 .enumerate()
606 .map(|(new, old)| (old, new))
607 .collect::<BTreeMap<_, _>>();
608 for mfp in mfps.iter_mut() {
609 mfp.permute_fn(|c| remap[&c], common_demand.len());
610 }
611 result_mfp = result_mfp.project(common_demand);
612
613 // Return the resulting MFP.
614 result_mfp.optimize();
615 result_mfp
616 }
617 }
618 }
619
620 /// Returns `self`, and leaves behind an identity operator that acts on its output.
621 pub fn take(&mut self) -> Self {
622 let mut identity = Self::new(self.projection.len());
623 std::mem::swap(self, &mut identity);
624 identity
625 }
626
627 /// Convert the `MapFilterProject` into a staged evaluation plan.
628 ///
629 /// The main behavior is extract temporal predicates, which cannot be evaluated
630 /// using the standard machinery.
631 pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
632 plan::MfpPlan::create_from(self)
633 }
634}
635
636impl MapFilterProject {
637 /// Partitions `self` into two instances, one of which can be eagerly applied.
638 ///
639 /// The `available` argument indicates which input columns are available (keys)
640 /// and in which positions (values). This information may allow some maps and
641 /// filters to execute. The `input_arity` argument reports the total number of
642 /// input columns (which may include some not present in `available`)
643 ///
644 /// This method partitions `self` in two parts, `(before, after)`, where `before`
645 /// can be applied on columns present as keys in `available`, and `after` must
646 /// await the introduction of the other input columns.
647 ///
648 /// The `before` instance will *append* any columns that can be determined from
649 /// `available` but will project away any of these columns that are not needed by
650 /// `after`. Importantly, this means that `before` will leave intact *all* input
651 /// columns including those not referenced in `available`.
652 ///
653 /// The `after` instance will presume all input columns are available, followed
654 /// by the appended columns of the `before` instance. It may be that some input
655 /// columns can be projected away in `before` if `after` does not need them, but
656 /// we leave that as something the caller can apply if needed (it is otherwise
657 /// complicated to negotiate which input columns `before` should retain).
658 ///
659 /// To correctly reconstruct `self` from `before` and `after`, one must introduce
660 /// additional input columns, permute all input columns to their locations as
661 /// expected by `self`, follow this by new columns appended by `before`, and
662 /// remove all other columns that may be present.
663 ///
664 /// # Example
665 ///
666 /// ```rust
667 /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
668 ///
669 /// // imagine an action on columns (a, b, c, d).
670 /// let original = MapFilterProject::new(4).map(vec![
671 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
672 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
673 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
674 /// ]).project(vec![6]);
675 ///
676 /// // Imagine we start with columns (b, x, a, y, c).
677 /// //
678 /// // The `partition` method requires a map from *expected* input columns to *actual*
679 /// // input columns. In the example above, the columns a, b, and c exist, and are at
680 /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
681 /// let mut available_columns = std::collections::BTreeMap::new();
682 /// available_columns.insert(0, 2);
683 /// available_columns.insert(1, 0);
684 /// available_columns.insert(2, 4);
685 /// // Partition `original` using the available columns and current input arity.
686 /// // This informs `partition` which columns are available, where they can be found,
687 /// // and how many columns are not relevant but should be preserved.
688 /// let (before, after) = original.partition(available_columns, 5);
689 ///
690 /// // `before` sees all five input columns, and should append `a + b + c`.
691 /// assert_eq!(before, MapFilterProject::new(5).map(vec![
692 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
693 /// MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
694 /// ]).project(vec![0, 1, 2, 3, 4, 6]));
695 ///
696 /// // `after` expects to see `(a, b, c, d, a + b + c)`.
697 /// assert_eq!(after, MapFilterProject::new(5).map(vec![
698 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
699 /// ]).project(vec![5]));
700 ///
701 /// // To reconstruct `self`, we must introduce the columns that are not present,
702 /// // and present them in the order intended by `self`. In this example, we must
703 /// // introduce column d and permute the columns so that they begin (a, b, c, d).
704 /// // The columns x and y must be projected away, and any columns introduced by
705 /// // `begin` must be retained in their current order.
706 ///
707 /// // The `after` instance expects to be provided with all inputs, but it
708 /// // may not need all inputs. The `demand()` and `permute()` methods can
709 /// // optimize the representation.
710 /// ```
711 pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
712 // Map expressions, filter predicates, and projections for `before` and `after`.
713 let mut before_expr = Vec::new();
714 let mut before_pred = Vec::new();
715 let mut before_proj = Vec::new();
716 let mut after_expr = Vec::new();
717 let mut after_pred = Vec::new();
718 let mut after_proj = Vec::new();
719
720 // Track which output columns must be preserved in the output of `before`.
721 let mut demanded = BTreeSet::new();
722 demanded.extend(0..self.input_arity);
723 demanded.extend(self.projection.iter());
724
725 // Determine which map expressions can be computed from the available subset.
726 // Some expressions may depend on other expressions, but by evaluating them
727 // in forward order we should accurately determine the available expressions.
728 let mut available_expr = vec![false; self.input_arity];
729 // Initialize available columns from `available`, which is then not used again.
730 for index in available.keys() {
731 available_expr[*index] = true;
732 }
733 for expr in self.expressions.into_iter() {
734 // We treat an expression as available if its supporting columns are available,
735 // and if it is not a literal (we want to avoid pushing down literals). This
736 // choice is ad-hoc, but the intent is that we partition the operators so
737 // that we can reduce the row representation size and total computation.
738 // Pushing down literals harms the former and does nothing for the latter.
739 // In the future, we'll want to have a harder think about this trade-off, as
740 // we are certainly making sub-optimal decisions by pushing down all available
741 // work.
742 // TODO(mcsherry): establish better principles about what work to push down.
743 let is_available =
744 expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
745 if is_available {
746 before_expr.push(expr);
747 } else {
748 demanded.extend(expr.support());
749 after_expr.push(expr);
750 }
751 available_expr.push(is_available);
752 }
753
754 // Determine which predicates can be computed from the available subset.
755 for (_when, pred) in self.predicates.into_iter() {
756 let is_available = pred.support().into_iter().all(|i| available_expr[i]);
757 if is_available {
758 before_pred.push(pred);
759 } else {
760 demanded.extend(pred.support());
761 after_pred.push(pred);
762 }
763 }
764
765 // Map from prior output location to location in un-projected `before`.
766 // This map is used to correct references in `before` but it should be
767 // adjusted to reflect `before`s projection prior to use in `after`.
768 let mut before_map = available;
769 // Input columns include any additional undescribed columns that may
770 // not be captured by the `available` argument, so we must independently
771 // track the current number of columns (vs relying on `before_map.len()`).
772 let mut input_columns = input_arity;
773 for index in self.input_arity..available_expr.len() {
774 if available_expr[index] {
775 before_map.insert(index, input_columns);
776 input_columns += 1;
777 }
778 }
779
780 // Permute the column references in `before` expressions and predicates.
781 for expr in before_expr.iter_mut() {
782 expr.permute_map(&before_map);
783 }
784 for pred in before_pred.iter_mut() {
785 pred.permute_map(&before_map);
786 }
787
788 // Demand information determines `before`s output projection.
789 // Specifically, we produce all input columns in the output, as well as
790 // any columns that are available and demanded.
791 before_proj.extend(0..input_arity);
792 for index in self.input_arity..available_expr.len() {
793 // If an intermediate result is both available and demanded,
794 // we should produce it as output.
795 if available_expr[index] && demanded.contains(&index) {
796 // Use the new location of `index`.
797 before_proj.push(before_map[&index]);
798 }
799 }
800
801 // Map from prior output locations to location in post-`before` columns.
802 // This map is used to correct references in `after`.
803 // The presumption is that `after` will be presented with all input columns,
804 // followed by the output columns introduced by `before` in order.
805 let mut after_map = BTreeMap::new();
806 for index in 0..self.input_arity {
807 after_map.insert(index, index);
808 }
809 for index in self.input_arity..available_expr.len() {
810 // If an intermediate result is both available and demanded,
811 // it was produced as output.
812 if available_expr[index] && demanded.contains(&index) {
813 // We expect to find the output as far after `self.input_arity` as
814 // it was produced after `input_arity` in the output of `before`.
815 let location = self.input_arity
816 + (before_proj
817 .iter()
818 .position(|x| x == &before_map[&index])
819 .unwrap()
820 - input_arity);
821 after_map.insert(index, location);
822 }
823 }
824 // We must now re-map the remaining non-demanded expressions, which are
825 // contiguous rather than potentially interspersed.
826 for index in self.input_arity..available_expr.len() {
827 if !available_expr[index] {
828 after_map.insert(index, after_map.len());
829 }
830 }
831
832 // Permute the column references in `after` expressions and predicates.
833 for expr in after_expr.iter_mut() {
834 expr.permute_map(&after_map);
835 }
836 for pred in after_pred.iter_mut() {
837 pred.permute_map(&after_map);
838 }
839 // Populate `after` projection with the new locations of `self.projection`.
840 for index in self.projection {
841 after_proj.push(after_map[&index]);
842 }
843
844 // Form and return the before and after MapFilterProject instances.
845 let before = Self::new(input_arity)
846 .map(before_expr)
847 .filter(before_pred)
848 .project(before_proj.clone());
849 let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
850 .map(after_expr)
851 .filter(after_pred)
852 .project(after_proj);
853 (before, after)
854 }
855
856 /// Lists input columns whose values are used in outputs.
857 ///
858 /// It is entirely appropriate to determine the demand of an instance
859 /// and then both apply a projection to the subject of the instance and
860 /// `self.permute` this instance.
861 pub fn demand(&self) -> BTreeSet<usize> {
862 let mut demanded = BTreeSet::new();
863 for (_index, pred) in self.predicates.iter() {
864 demanded.extend(pred.support());
865 }
866 demanded.extend(self.projection.iter().cloned());
867 for index in (0..self.expressions.len()).rev() {
868 if demanded.contains(&(self.input_arity + index)) {
869 demanded.extend(self.expressions[index].support());
870 }
871 }
872 demanded.retain(|col| col < &self.input_arity);
873 demanded
874 }
875
876 /// Update input column references, due to an input projection or permutation.
877 ///
878 /// The `shuffle` argument remaps expected column identifiers to new locations,
879 /// with the expectation that `shuffle` describes all input columns, and so the
880 /// intermediate results will be able to start at position `shuffle.len()`.
881 ///
882 /// The supplied `shuffle` might not list columns that are not "demanded" by the
883 /// instance, and so we should ensure that `self` is optimized to not reference
884 /// columns that are not demanded.
885 pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
886 where
887 F: Fn(usize) -> usize,
888 {
889 let (mut map, mut filter, mut project) = self.as_map_filter_project();
890 let map_len = map.len();
891 let action = |col: &mut usize| {
892 if self.input_arity <= *col && *col < self.input_arity + map_len {
893 *col = new_input_arity + (*col - self.input_arity);
894 } else {
895 *col = remap(*col);
896 }
897 };
898 for expr in map.iter_mut() {
899 expr.visit_columns(action);
900 }
901 for pred in filter.iter_mut() {
902 pred.visit_columns(action);
903 }
904 for proj in project.iter_mut() {
905 action(proj);
906 assert!(*proj < new_input_arity + map.len());
907 }
908 *self = Self::new(new_input_arity)
909 .map(map)
910 .filter(filter)
911 .project(project)
912 }
913}
914
915// Optimization routines.
916impl MapFilterProject {
917 /// Optimize the internal expression evaluation order.
918 ///
919 /// This method performs several optimizations that are meant to streamline
920 /// the execution of the `MapFilterProject` instance, but not to alter its
921 /// semantics. This includes extracting expressions that are used multiple
922 /// times, inlining those that are not, and removing expressions that are
923 /// unreferenced.
924 ///
925 /// This method will inline all temporal expressions, and remove any columns
926 /// that are not demanded by the output, which should transform any temporal
927 /// filters to a state where the temporal expressions exist only in the list
928 /// of predicates.
929 ///
930 /// # Example
931 ///
932 /// This example demonstrates how the re-use of one expression, converting
933 /// column 1 from a string to an integer, can be extracted and the results
934 /// shared among the two uses. This example is used for each of the steps
935 /// along the optimization path.
936 ///
937 /// ```rust
938 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
939 /// // Demonstrate extraction of common expressions (here: parsing strings).
940 /// let mut map_filter_project = MapFilterProject::new(5)
941 /// .map(vec![
942 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
943 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
944 /// ])
945 /// .project(vec![3,4,5,6]);
946 ///
947 /// let mut expected_optimized = MapFilterProject::new(5)
948 /// .map(vec![
949 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
950 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
951 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
952 /// ])
953 /// .project(vec![3,4,6,7]);
954 ///
955 /// // Optimize the expression.
956 /// map_filter_project.optimize();
957 ///
958 /// assert_eq!(
959 /// map_filter_project,
960 /// expected_optimized,
961 /// );
962 /// ```
963 pub fn optimize(&mut self) {
964 // Track sizes and iterate as long as they decrease.
965 let mut prev_size = None;
966 let mut self_size = usize::max_value();
967 // Continue as long as strict improvements occur.
968 while prev_size.map(|p| self_size < p).unwrap_or(true) {
969 // Lock in current size.
970 prev_size = Some(self_size);
971
972 // We have an annoying pattern of mapping literals that already exist as columns (by filters).
973 // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
974 // and then replace the mapped expression by a column reference.
975 //
976 // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
977 // the first place. The tell-tale that we see when we fix is a diff that look likes
978 //
979 // - Project (#0, #2)
980 // - Filter (#1 = 1)
981 // - Map (1)
982 // - Get l0
983 // + Filter (#1 = 1)
984 // + Get l0
985 //
986 for (index, expr) in self.expressions.iter_mut().enumerate() {
987 // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
988 for (_, predicate) in self.predicates.iter() {
989 if let MirScalarExpr::CallBinary {
990 func: crate::BinaryFunc::Eq(_),
991 expr1,
992 expr2,
993 } = predicate
994 {
995 if let MirScalarExpr::Column(c, name) = &**expr1 {
996 if *c < index + self.input_arity && &**expr2 == expr {
997 *expr = MirScalarExpr::Column(*c, name.clone());
998 }
999 }
1000 if let MirScalarExpr::Column(c, name) = &**expr2 {
1001 if *c < index + self.input_arity && &**expr1 == expr {
1002 *expr = MirScalarExpr::Column(*c, name.clone());
1003 }
1004 }
1005 }
1006 }
1007 }
1008
1009 // Optimization memoizes individual `ScalarExpr` expressions that
1010 // are sure to be evaluated, canonicalizes references to the first
1011 // occurrence of each, inlines expressions that have a reference
1012 // count of one, and then removes any expressions that are not
1013 // referenced.
1014 self.memoize_expressions();
1015 self.predicates.sort();
1016 self.predicates.dedup();
1017 self.inline_expressions();
1018 self.remove_undemanded();
1019
1020 // Re-build `self` from parts to restore evaluation order invariants.
1021 let (map, filter, project) = self.as_map_filter_project();
1022 *self = Self::new(self.input_arity)
1023 .map(map)
1024 .filter(filter)
1025 .project(project);
1026
1027 self_size = self.size();
1028 }
1029 }
1030
1031 /// Total expression sizes across all expressions.
1032 pub fn size(&self) -> usize {
1033 self.expressions.iter().map(|e| e.size()).sum::<usize>()
1034 + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1035 }
1036
1037 /// Place each certainly evaluated expression in its own column.
1038 ///
1039 /// This method places each non-trivial, certainly evaluated expression
1040 /// in its own column, and deduplicates them so that all references to
1041 /// the same expression reference the same column.
1042 ///
1043 /// This transformation is restricted to expressions we are certain will
1044 /// be evaluated, which does not include expressions in `if` statements.
1045 ///
1046 /// # Example
1047 ///
1048 /// This example demonstrates how memoization notices `MirScalarExpr`s
1049 /// that are used multiple times, and ensures that each are extracted
1050 /// into columns and then referenced by column. This pass does not try
1051 /// to minimize the occurrences of column references, which will happen
1052 /// in inlining.
1053 ///
1054 /// ```rust
1055 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1056 /// // Demonstrate extraction of common expressions (here: parsing strings).
1057 /// let mut map_filter_project = MapFilterProject::new(5)
1058 /// .map(vec![
1059 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1060 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1061 /// ])
1062 /// .project(vec![3,4,5,6]);
1063 ///
1064 /// let mut expected_optimized = MapFilterProject::new(5)
1065 /// .map(vec![
1066 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1067 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1068 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1069 /// MirScalarExpr::column(7),
1070 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1071 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1072 /// MirScalarExpr::column(10),
1073 /// ])
1074 /// .project(vec![3,4,8,11]);
1075 ///
1076 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1077 /// map_filter_project.memoize_expressions();
1078 ///
1079 /// assert_eq!(
1080 /// map_filter_project,
1081 /// expected_optimized,
1082 /// );
1083 /// ```
1084 ///
1085 /// Expressions may not be memoized if they are not certain to be evaluated,
1086 /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1087 ///
1088 /// ```rust
1089 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1090 /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1091 /// // the non-extraction of common expressions guarded by conditions.
1092 /// let mut map_filter_project = MapFilterProject::new(2)
1093 /// .map(vec![
1094 /// MirScalarExpr::If {
1095 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1096 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1097 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1098 /// },
1099 /// MirScalarExpr::If {
1100 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1101 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1102 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1103 /// },
1104 /// ]);
1105 ///
1106 /// let mut expected_optimized = MapFilterProject::new(2)
1107 /// .map(vec![
1108 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1109 /// MirScalarExpr::If {
1110 /// cond: Box::new(MirScalarExpr::column(2)),
1111 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1112 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1113 /// },
1114 /// MirScalarExpr::column(3),
1115 /// MirScalarExpr::If {
1116 /// cond: Box::new(MirScalarExpr::column(2)),
1117 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1118 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1119 /// },
1120 /// MirScalarExpr::column(5),
1121 /// ])
1122 /// .project(vec![0,1,4,6]);
1123 ///
1124 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1125 /// map_filter_project.memoize_expressions();
1126 ///
1127 /// assert_eq!(
1128 /// map_filter_project,
1129 /// expected_optimized,
1130 /// );
1131 /// ```
1132 pub fn memoize_expressions(&mut self) {
1133 // Record the mapping from starting column references to new column
1134 // references.
1135 let mut remaps = BTreeMap::new();
1136 for index in 0..self.input_arity {
1137 remaps.insert(index, index);
1138 }
1139 let mut new_expressions = Vec::new();
1140
1141 // We follow the same order as for evaluation, to ensure that all
1142 // column references exist in time for their evaluation. We could
1143 // prioritize predicates, but we would need to be careful to chase
1144 // down column references to expressions and memoize those as well.
1145 let mut expression = 0;
1146 for (support, predicate) in self.predicates.iter_mut() {
1147 while self.input_arity + expression < *support {
1148 self.expressions[expression].permute_map(&remaps);
1149 memoize_expr(
1150 &mut self.expressions[expression],
1151 &mut new_expressions,
1152 self.input_arity,
1153 );
1154 remaps.insert(
1155 self.input_arity + expression,
1156 self.input_arity + new_expressions.len(),
1157 );
1158 new_expressions.push(self.expressions[expression].clone());
1159 expression += 1;
1160 }
1161 predicate.permute_map(&remaps);
1162 memoize_expr(predicate, &mut new_expressions, self.input_arity);
1163 }
1164 while expression < self.expressions.len() {
1165 self.expressions[expression].permute_map(&remaps);
1166 memoize_expr(
1167 &mut self.expressions[expression],
1168 &mut new_expressions,
1169 self.input_arity,
1170 );
1171 remaps.insert(
1172 self.input_arity + expression,
1173 self.input_arity + new_expressions.len(),
1174 );
1175 new_expressions.push(self.expressions[expression].clone());
1176 expression += 1;
1177 }
1178
1179 self.expressions = new_expressions;
1180 for proj in self.projection.iter_mut() {
1181 *proj = remaps[proj];
1182 }
1183
1184 // Restore predicate order invariants.
1185 for (pos, pred) in self.predicates.iter_mut() {
1186 *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1187 }
1188 }
1189
1190 /// This method inlines expressions with a single use.
1191 ///
1192 /// This method only inlines expressions; it does not delete expressions
1193 /// that are no longer referenced. The `remove_undemanded()` method does
1194 /// that, and should likely be used after this method.
1195 ///
1196 /// Inlining replaces column references when the referred-to item is either
1197 /// another column reference, or the only referrer of its referent. This
1198 /// is most common after memoization has atomized all expressions to seek
1199 /// out re-use: inlining re-assembles expressions that were not helpfully
1200 /// shared with other expressions.
1201 ///
1202 /// # Example
1203 ///
1204 /// In this example, we see that with only a single reference to columns
1205 /// 0 and 2, their parsing can each be inlined. Similarly, column references
1206 /// can be cleaned up among expressions, and in the final projection.
1207 ///
1208 /// Also notice the remaining expressions, which can be cleaned up in a later
1209 /// pass (the `remove_undemanded` method).
1210 ///
1211 /// ```rust
1212 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1213 /// // Use the output from first `memoize_expression` example.
1214 /// let mut map_filter_project = MapFilterProject::new(5)
1215 /// .map(vec![
1216 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1217 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1218 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1219 /// MirScalarExpr::column(7),
1220 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1221 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1222 /// MirScalarExpr::column(10),
1223 /// ])
1224 /// .project(vec![3,4,8,11]);
1225 ///
1226 /// let mut expected_optimized = MapFilterProject::new(5)
1227 /// .map(vec![
1228 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1229 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1230 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1231 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1232 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1233 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1234 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1235 /// ])
1236 /// .project(vec![3,4,8,11]);
1237 ///
1238 /// // Inline expressions that are referenced only once.
1239 /// map_filter_project.inline_expressions();
1240 ///
1241 /// assert_eq!(
1242 /// map_filter_project,
1243 /// expected_optimized,
1244 /// );
1245 /// ```
1246 pub fn inline_expressions(&mut self) {
1247 // Local copy of input_arity to avoid borrowing `self` in closures.
1248 let input_arity = self.input_arity;
1249 // Reference counts track the number of places that a reference occurs.
1250 let mut reference_count = vec![0; input_arity + self.expressions.len()];
1251 // Increment reference counts for each use
1252 for expr in self.expressions.iter() {
1253 expr.visit_pre(|e| {
1254 if let MirScalarExpr::Column(i, _name) = e {
1255 reference_count[*i] += 1;
1256 }
1257 });
1258 }
1259 for (_, pred) in self.predicates.iter() {
1260 pred.visit_pre(|e| {
1261 if let MirScalarExpr::Column(i, _name) = e {
1262 reference_count[*i] += 1;
1263 }
1264 });
1265 }
1266 for proj in self.projection.iter() {
1267 reference_count[*proj] += 1;
1268 }
1269
1270 // Determine which expressions should be inlined because they reference temporal expressions.
1271 let mut is_temporal = vec![false; input_arity];
1272 for expr in self.expressions.iter() {
1273 // An express may contain a temporal expression, or reference a column containing such.
1274 is_temporal.push(
1275 expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1276 );
1277 }
1278
1279 // Inline only those columns that 1. are expressions not inputs, and
1280 // 2a. are column references or literals or 2b. have a refcount of 1,
1281 // or 2c. reference temporal expressions (which cannot be evaluated).
1282 let mut should_inline = vec![false; reference_count.len()];
1283 for i in (input_arity..reference_count.len()).rev() {
1284 if let MirScalarExpr::Column(c, _) = self.expressions[i - input_arity] {
1285 should_inline[i] = true;
1286 // The reference count of the referenced column should be
1287 // incremented with the number of references
1288 // `self.expressions[i - input_arity]` has.
1289 // Subtract 1 because `self.expressions[i - input_arity]` is
1290 // itself a reference.
1291 reference_count[c] += reference_count[i] - 1;
1292 } else {
1293 should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1294 }
1295 }
1296 // Inline expressions per `should_inline`.
1297 self.perform_inlining(should_inline);
1298 // We can only inline column references in `self.projection`, but we should.
1299 for proj in self.projection.iter_mut() {
1300 if *proj >= self.input_arity {
1301 if let MirScalarExpr::Column(i, _) = self.expressions[*proj - self.input_arity] {
1302 // TODO(mgree) !!! propagate name information to projection
1303 *proj = i;
1304 }
1305 }
1306 }
1307 }
1308
1309 /// Inlines those expressions that are indicated by should_inline.
1310 /// See `inline_expressions` for usage.
1311 pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1312 for index in 0..self.expressions.len() {
1313 let (prior, expr) = self.expressions.split_at_mut(index);
1314 #[allow(deprecated)]
1315 expr[0].visit_mut_post_nolimit(&mut |e| {
1316 if let MirScalarExpr::Column(i, _name) = e {
1317 if should_inline[*i] {
1318 *e = prior[*i - self.input_arity].clone();
1319 }
1320 }
1321 });
1322 }
1323 for (_index, pred) in self.predicates.iter_mut() {
1324 let expressions = &self.expressions;
1325 #[allow(deprecated)]
1326 pred.visit_mut_post_nolimit(&mut |e| {
1327 if let MirScalarExpr::Column(i, _name) = e {
1328 if should_inline[*i] {
1329 *e = expressions[*i - self.input_arity].clone();
1330 }
1331 }
1332 });
1333 }
1334 }
1335
1336 /// Removes unused expressions from `self.expressions`.
1337 ///
1338 /// Expressions are "used" if they are relied upon by any output columns
1339 /// or any predicates, even transitively. Any expressions that are not
1340 /// relied upon in this way can be discarded.
1341 ///
1342 /// # Example
1343 ///
1344 /// ```rust
1345 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1346 /// // Use the output from `inline_expression` example.
1347 /// let mut map_filter_project = MapFilterProject::new(5)
1348 /// .map(vec![
1349 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1350 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1351 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1352 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1353 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1354 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1355 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1356 /// ])
1357 /// .project(vec![3,4,8,11]);
1358 ///
1359 /// let mut expected_optimized = MapFilterProject::new(5)
1360 /// .map(vec![
1361 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1362 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1363 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1364 /// ])
1365 /// .project(vec![3,4,6,7]);
1366 ///
1367 /// // Remove undemanded expressions, streamlining the work done..
1368 /// map_filter_project.remove_undemanded();
1369 ///
1370 /// assert_eq!(
1371 /// map_filter_project,
1372 /// expected_optimized,
1373 /// );
1374 /// ```
1375 pub fn remove_undemanded(&mut self) {
1376 // Determine the demanded expressions to remove irrelevant ones.
1377 let mut demand = BTreeSet::new();
1378 for (_index, pred) in self.predicates.iter() {
1379 demand.extend(pred.support());
1380 }
1381 // Start from the output columns as presumed demanded.
1382 // If this is not the case, the caller should project some away.
1383 demand.extend(self.projection.iter().cloned());
1384 // Proceed in *reverse* order, as expressions may depend on other
1385 // expressions that precede them.
1386 for index in (0..self.expressions.len()).rev() {
1387 if demand.contains(&(self.input_arity + index)) {
1388 demand.extend(self.expressions[index].support());
1389 }
1390 }
1391
1392 // Maintain a map from initial column identifiers to locations
1393 // once we have removed undemanded expressions.
1394 let mut remap = BTreeMap::new();
1395 // This map only needs to map elements of `demand` to a new location,
1396 // but the logic is easier if we include all input columns (as the
1397 // new position is then determined by the size of the map).
1398 for index in 0..self.input_arity {
1399 remap.insert(index, index);
1400 }
1401 // Retain demanded expressions, and record their new locations.
1402 let mut new_expressions = Vec::new();
1403 for (index, expr) in self.expressions.drain(..).enumerate() {
1404 if demand.contains(&(index + self.input_arity)) {
1405 remap.insert(index + self.input_arity, remap.len());
1406 new_expressions.push(expr);
1407 }
1408 }
1409 self.expressions = new_expressions;
1410
1411 // Update column identifiers; rebuild `Self` to re-establish any invariants.
1412 // We mirror `self.permute(&remap)` but we specifically want to remap columns
1413 // that are produced by `self.expressions` after the input columns.
1414 let (expressions, predicates, projection) = self.as_map_filter_project();
1415 *self = Self::new(self.input_arity)
1416 .map(expressions.into_iter().map(|mut e| {
1417 e.permute_map(&remap);
1418 e
1419 }))
1420 .filter(predicates.into_iter().map(|mut p| {
1421 p.permute_map(&remap);
1422 p
1423 }))
1424 .project(projection.into_iter().map(|c| remap[&c]));
1425 }
1426}
1427
1428// TODO: move this elsewhere?
1429/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1430///
1431/// A part of `expr` that is memoized is replaced by a reference to column
1432/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1433/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1434/// refers to.
1435pub fn memoize_expr(
1436 expr: &mut MirScalarExpr,
1437 memoized_parts: &mut Vec<MirScalarExpr>,
1438 input_arity: usize,
1439) {
1440 #[allow(deprecated)]
1441 expr.visit_mut_pre_post_nolimit(
1442 &mut |e| {
1443 // We should not eagerly memoize `if` branches that might not be taken.
1444 // TODO: Memoize expressions in the intersection of `then` and `els`.
1445 if let MirScalarExpr::If { cond, .. } = e {
1446 return Some(vec![cond]);
1447 }
1448
1449 // We should not eagerly memoize `COALESCE` expressions after the first,
1450 // as they are only meant to be evaluated if the preceding expressions
1451 // evaluate to NULL. We could memoize any preceding by expressions that
1452 // are certain not to error.
1453 if let MirScalarExpr::CallVariadic {
1454 func: crate::VariadicFunc::Coalesce(_),
1455 exprs,
1456 } = e
1457 {
1458 return Some(exprs.iter_mut().take(1).collect());
1459 }
1460
1461 // We should not deconstruct temporal filters, because `MfpPlan::create_from` expects
1462 // those to be in a specific form. However, we _should_ attend to the expression that is
1463 // on the opposite side of mz_now(), because it might be a complex expression in itself,
1464 // and is ok to deconstruct.
1465 if let Some((_func, other_side)) = e.as_mut_temporal_filter().ok() {
1466 return Some(vec![other_side]);
1467 }
1468
1469 None
1470 },
1471 &mut |e| {
1472 match e {
1473 MirScalarExpr::Literal(_, _) => {
1474 // Literals do not need to be memoized.
1475 }
1476 MirScalarExpr::Column(col, _) => {
1477 // Column references do not need to be memoized, but may need to be
1478 // updated if they reference a column reference themselves.
1479 if *col > input_arity {
1480 if let MirScalarExpr::Column(col2, _) = memoized_parts[*col - input_arity] {
1481 // We do _not_ propagate column names, since mis-associating names and column
1482 // references will be very confusing (and possibly bug-inducing).
1483 *col = col2;
1484 }
1485 }
1486 }
1487 _ => {
1488 // TODO: OOO (Optimizer Optimization Opportunity):
1489 // we are quadratic in expression size because of this .iter().position
1490 if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1491 // Any complex expression that already exists as a prior column can
1492 // be replaced by a reference to that column.
1493 *e = MirScalarExpr::column(input_arity + position);
1494 } else {
1495 // A complex expression that does not exist should be memoized, and
1496 // replaced by a reference to the column.
1497 memoized_parts.push(std::mem::replace(
1498 e,
1499 MirScalarExpr::column(input_arity + memoized_parts.len()),
1500 ));
1501 }
1502 }
1503 }
1504 },
1505 )
1506}
1507
1508pub mod util {
1509 use std::collections::BTreeMap;
1510
1511 use crate::MirScalarExpr;
1512
1513 #[allow(dead_code)]
1514 /// A triple of actions that map from rows to (key, val) pairs and back again.
1515 struct KeyValRowMapping {
1516 /// Expressions to apply to a row to produce key datums.
1517 to_key: Vec<MirScalarExpr>,
1518 /// Columns to project from a row to produce residual value datums.
1519 to_val: Vec<usize>,
1520 /// Columns to project from the concatenation of key and value to reconstruct the row.
1521 to_row: Vec<usize>,
1522 }
1523
1524 /// Derive supporting logic to support transforming rows to (key, val) pairs,
1525 /// and back again.
1526 ///
1527 /// We are given as input a list of key expressions and an input arity, and the
1528 /// requirement the produced key should be the application of the key expressions.
1529 /// To produce the `val` output, we will identify those input columns not found in
1530 /// the key expressions, and name all other columns.
1531 /// To reconstitute the original row, we identify the sequence of columns from the
1532 /// concatenation of key and val which would reconstruct the original row.
1533 ///
1534 /// The output is a pair of column sequences, the first used to reconstruct a row
1535 /// from the concatenation of key and value, and the second to identify the columns
1536 /// of a row that should become the value associated with its key.
1537 ///
1538 /// The permutations and thinning expressions generated here will be tracked in
1539 /// `dataflow::plan::AvailableCollections`; see the
1540 /// documentation there for more details.
1541 pub fn permutation_for_arrangement(
1542 key: &[MirScalarExpr],
1543 unthinned_arity: usize,
1544 ) -> (Vec<usize>, Vec<usize>) {
1545 let columns_in_key: BTreeMap<_, _> = key
1546 .iter()
1547 .enumerate()
1548 .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1549 .collect();
1550 let mut input_cursor = key.len();
1551 let permutation = (0..unthinned_arity)
1552 .map(|c| {
1553 if let Some(c) = columns_in_key.get(&c) {
1554 // Column is in key (and thus gone from the value
1555 // of the thinned representation)
1556 *c
1557 } else {
1558 // Column remains in value of the thinned representation
1559 input_cursor += 1;
1560 input_cursor - 1
1561 }
1562 })
1563 .collect();
1564 let thinning = (0..unthinned_arity)
1565 .filter(|c| !columns_in_key.contains_key(c))
1566 .collect();
1567 (permutation, thinning)
1568 }
1569
1570 /// Given the permutations (see [`permutation_for_arrangement`] and
1571 /// (`dataflow::plan::AvailableCollections`) corresponding to two
1572 /// collections with the same key arity,
1573 /// computes the permutation for the result of joining them.
1574 pub fn join_permutations(
1575 key_arity: usize,
1576 stream_permutation: Vec<usize>,
1577 thinned_stream_arity: usize,
1578 lookup_permutation: Vec<usize>,
1579 ) -> BTreeMap<usize, usize> {
1580 let stream_arity = stream_permutation.len();
1581 let lookup_arity = lookup_permutation.len();
1582
1583 (0..stream_arity + lookup_arity)
1584 .map(|i| {
1585 let location = if i < stream_arity {
1586 stream_permutation[i]
1587 } else {
1588 let location_in_lookup = lookup_permutation[i - stream_arity];
1589 if location_in_lookup < key_arity {
1590 location_in_lookup
1591 } else {
1592 location_in_lookup + thinned_stream_arity
1593 }
1594 };
1595 (i, location)
1596 })
1597 .collect()
1598 }
1599}
1600
1601pub mod plan {
1602 use std::iter;
1603
1604 use mz_repr::{Datum, Diff, Row, RowArena};
1605 use serde::{Deserialize, Serialize};
1606
1607 use crate::{BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, UnaryFunc, func};
1608
1609 /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1610 #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1611 pub struct SafeMfpPlan {
1612 pub(crate) mfp: MapFilterProject,
1613 }
1614
1615 impl SafeMfpPlan {
1616 /// Remaps references to input columns according to `remap`.
1617 ///
1618 /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1619 pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1620 where
1621 F: Fn(usize) -> usize,
1622 {
1623 self.mfp.permute_fn(remap, new_arity);
1624 }
1625 /// Evaluates the linear operator on a supplied list of datums.
1626 ///
1627 /// The arguments are the initial datums associated with the row,
1628 /// and an appropriately lifetimed arena for temporary allocations
1629 /// needed by scalar evaluation.
1630 ///
1631 /// An `Ok` result will either be `None` if any predicate did not
1632 /// evaluate to `Datum::True`, or the values of the columns listed
1633 /// by `self.projection` if all predicates passed. If an error
1634 /// occurs in the evaluation it is returned as an `Err` variant.
1635 /// As the evaluation exits early with failed predicates, it may
1636 /// miss some errors that would occur later in evaluation.
1637 ///
1638 /// The `row` is not cleared first, but emptied if the function
1639 /// returns `Ok(Some(row)).
1640 #[inline(always)]
1641 pub fn evaluate_into<'a, 'row>(
1642 &'a self,
1643 datums: &mut Vec<Datum<'a>>,
1644 arena: &'a RowArena,
1645 row_buf: &'row mut Row,
1646 ) -> Result<Option<&'row Row>, EvalError> {
1647 let passed_predicates = self.evaluate_inner(datums, arena)?;
1648 if !passed_predicates {
1649 Ok(None)
1650 } else {
1651 row_buf
1652 .packer()
1653 .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1654 Ok(Some(row_buf))
1655 }
1656 }
1657
1658 /// A version of `evaluate` which produces an iterator over `Datum`
1659 /// as output.
1660 ///
1661 /// This version can be useful when one wants to capture the resulting
1662 /// datums without packing and then unpacking a row.
1663 #[inline(always)]
1664 pub fn evaluate_iter<'b, 'a: 'b>(
1665 &'a self,
1666 datums: &'b mut Vec<Datum<'a>>,
1667 arena: &'a RowArena,
1668 ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1669 let passed_predicates = self.evaluate_inner(datums, arena)?;
1670 if !passed_predicates {
1671 Ok(None)
1672 } else {
1673 Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1674 }
1675 }
1676
1677 /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1678 ///
1679 /// This does not apply `self.projection`, which is up to the calling method.
1680 pub fn evaluate_inner<'b, 'a: 'b>(
1681 &'a self,
1682 datums: &'b mut Vec<Datum<'a>>,
1683 arena: &'a RowArena,
1684 ) -> Result<bool, EvalError> {
1685 let mut expression = 0;
1686 for (support, predicate) in self.mfp.predicates.iter() {
1687 while self.mfp.input_arity + expression < *support {
1688 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1689 expression += 1;
1690 }
1691 if predicate.eval(&datums[..], arena)? != Datum::True {
1692 return Ok(false);
1693 }
1694 }
1695 while expression < self.mfp.expressions.len() {
1696 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1697 expression += 1;
1698 }
1699 Ok(true)
1700 }
1701
1702 /// Returns true if evaluation could introduce an error on non-error inputs.
1703 pub fn could_error(&self) -> bool {
1704 self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1705 || self.mfp.expressions.iter().any(|e| e.could_error())
1706 }
1707
1708 /// Returns true when `Self` is the identity.
1709 pub fn is_identity(&self) -> bool {
1710 self.mfp.is_identity()
1711 }
1712 }
1713
1714 impl std::ops::Deref for SafeMfpPlan {
1715 type Target = MapFilterProject;
1716 fn deref(&self) -> &Self::Target {
1717 &self.mfp
1718 }
1719 }
1720
1721 /// Predicates partitioned into temporal and non-temporal.
1722 ///
1723 /// Temporal predicates require some recognition to determine their
1724 /// structure, and it is best to do that once and re-use the results.
1725 ///
1726 /// There are restrictions on the temporal predicates we currently support.
1727 /// They must directly constrain `MzNow` from below or above,
1728 /// by expressions that do not themselves contain `MzNow`.
1729 /// Conjunctions of such constraints are also ok.
1730 #[derive(Clone, Debug, PartialEq)]
1731 pub struct MfpPlan {
1732 /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1733 pub(crate) mfp: SafeMfpPlan,
1734 /// Expressions that when evaluated lower-bound `MzNow`.
1735 pub(crate) lower_bounds: Vec<MirScalarExpr>,
1736 /// Expressions that when evaluated upper-bound `MzNow`.
1737 pub(crate) upper_bounds: Vec<MirScalarExpr>,
1738 }
1739
1740 impl MfpPlan {
1741 /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1742 ///
1743 /// The first returned list is of predicates that do not contain `mz_now`.
1744 /// The second and third returned lists contain expressions that, once evaluated, lower
1745 /// and upper bound the validity interval of a record, respectively. These second two
1746 /// lists are populated only by binary expressions of the form
1747 /// ```ignore
1748 /// mz_now cmp_op expr
1749 /// ```
1750 /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1751 ///
1752 /// If any unsupported expression is found, for example one that uses `mz_now`
1753 /// in an unsupported position, an error is returned.
1754 pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1755 let mut lower_bounds = Vec::new();
1756 let mut upper_bounds = Vec::new();
1757
1758 let mut temporal = Vec::new();
1759
1760 // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1761 mfp.optimize();
1762
1763 mfp.predicates.retain(|(_position, predicate)| {
1764 if predicate.contains_temporal() {
1765 temporal.push(predicate.clone());
1766 false
1767 } else {
1768 true
1769 }
1770 });
1771
1772 for mut predicate in temporal.into_iter() {
1773 let (func, expr2) = predicate.as_mut_temporal_filter()?;
1774 let expr2 = expr2.clone();
1775
1776 // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1777 match func {
1778 BinaryFunc::Eq(_) => {
1779 lower_bounds.push(expr2.clone());
1780 upper_bounds.push(
1781 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1782 );
1783 }
1784 BinaryFunc::Lt(_) => {
1785 upper_bounds.push(expr2.clone());
1786 }
1787 BinaryFunc::Lte(_) => {
1788 upper_bounds.push(
1789 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1790 );
1791 }
1792 BinaryFunc::Gt(_) => {
1793 lower_bounds.push(
1794 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1795 );
1796 }
1797 BinaryFunc::Gte(_) => {
1798 lower_bounds.push(expr2.clone());
1799 }
1800 _ => {
1801 return Err(format!("Unsupported binary temporal operation: {:?}", func));
1802 }
1803 }
1804 }
1805
1806 Ok(Self {
1807 mfp: SafeMfpPlan { mfp },
1808 lower_bounds,
1809 upper_bounds,
1810 })
1811 }
1812
1813 /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1814 pub fn is_identity(&self) -> bool {
1815 self.mfp.mfp.is_identity()
1816 && self.lower_bounds.is_empty()
1817 && self.upper_bounds.is_empty()
1818 }
1819
1820 /// Returns `self`, and leaves behind an identity operator that acts on its output.
1821 pub fn take(&mut self) -> Self {
1822 let mut identity = Self {
1823 mfp: SafeMfpPlan {
1824 mfp: MapFilterProject::new(self.mfp.projection.len()),
1825 },
1826 lower_bounds: Default::default(),
1827 upper_bounds: Default::default(),
1828 };
1829 std::mem::swap(self, &mut identity);
1830 identity
1831 }
1832
1833 /// Attempt to convert self into a non-temporal MapFilterProject plan.
1834 ///
1835 /// If that is not possible, the original instance is returned as an error.
1836 #[allow(clippy::result_large_err)]
1837 pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1838 if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1839 Ok(self.mfp)
1840 } else {
1841 Err(self)
1842 }
1843 }
1844
1845 /// Returns an iterator over mutable references to all non-temporal
1846 /// scalar expressions in the plan.
1847 ///
1848 /// The order of iteration is unspecified.
1849 pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1850 iter::empty()
1851 .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1852 .chain(&mut self.mfp.mfp.expressions)
1853 .chain(&mut self.lower_bounds)
1854 .chain(&mut self.upper_bounds)
1855 }
1856
1857 /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1858 ///
1859 /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1860 /// or an evaluation error. If `self contains temporal predicates, the results can be times
1861 /// that are greater than the input `time`, and may contain negated `diff` values.
1862 ///
1863 /// The `row_builder` is not cleared first, but emptied if the function
1864 /// returns an iterator with any `Ok(_)` element.
1865 pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1866 &'a self,
1867 datums: &'b mut Vec<Datum<'a>>,
1868 arena: &'a RowArena,
1869 time: mz_repr::Timestamp,
1870 diff: Diff,
1871 valid_time: V,
1872 row_builder: &mut Row,
1873 ) -> impl Iterator<
1874 Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1875 > + use<E, V> {
1876 match self.mfp.evaluate_inner(datums, arena) {
1877 Err(e) => {
1878 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1879 }
1880 Ok(true) => {}
1881 Ok(false) => {
1882 return None.into_iter().chain(None);
1883 }
1884 }
1885
1886 // Lower and upper bounds.
1887 let mut lower_bound = time;
1888 let mut upper_bound = None;
1889
1890 // Track whether we have seen a null in either bound, as this should
1891 // prevent the record from being produced at any time.
1892 let mut null_eval = false;
1893
1894 // Advance our lower bound to be at least the result of any lower bound
1895 // expressions.
1896 for l in self.lower_bounds.iter() {
1897 match l.eval(datums, arena) {
1898 Err(e) => {
1899 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1900 }
1901 Ok(Datum::MzTimestamp(d)) => {
1902 lower_bound = lower_bound.max(d);
1903 }
1904 Ok(Datum::Null) => {
1905 null_eval = true;
1906 }
1907 x => {
1908 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1909 }
1910 }
1911 }
1912
1913 // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1914 if !valid_time(&lower_bound) {
1915 return None.into_iter().chain(None);
1916 }
1917
1918 // If there are any upper bounds, determine the minimum upper bound.
1919 for u in self.upper_bounds.iter() {
1920 // We can cease as soon as the lower and upper bounds match,
1921 // as the update will certainly not be produced in that case.
1922 if upper_bound != Some(lower_bound) {
1923 match u.eval(datums, arena) {
1924 Err(e) => {
1925 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1926 }
1927 Ok(Datum::MzTimestamp(d)) => {
1928 if let Some(upper) = upper_bound {
1929 upper_bound = Some(upper.min(d));
1930 } else {
1931 upper_bound = Some(d);
1932 };
1933 // Force the upper bound to be at least the lower
1934 // bound. The `is_some()` test should always be true
1935 // due to the above block, but maintain it here in
1936 // case that changes. It's hopefully optimized away.
1937 if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1938 upper_bound = Some(lower_bound);
1939 }
1940 }
1941 Ok(Datum::Null) => {
1942 null_eval = true;
1943 }
1944 x => {
1945 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1946 }
1947 }
1948 }
1949 }
1950
1951 // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1952 if let Some(upper) = &mut upper_bound {
1953 if !valid_time(upper) {
1954 upper_bound = None;
1955 }
1956 }
1957
1958 // Produce an output only if the upper bound exceeds the lower bound,
1959 // and if we did not encounter a `null` in our evaluation.
1960 if Some(lower_bound) != upper_bound && !null_eval {
1961 row_builder
1962 .packer()
1963 .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1964 let upper_opt =
1965 upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1966 let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1967 lower.into_iter().chain(upper_opt)
1968 } else {
1969 None.into_iter().chain(None)
1970 }
1971 }
1972
1973 /// Returns true if evaluation could introduce an error on non-error inputs.
1974 pub fn could_error(&self) -> bool {
1975 self.mfp.could_error()
1976 || self.lower_bounds.iter().any(|e| e.could_error())
1977 || self.upper_bounds.iter().any(|e| e.could_error())
1978 }
1979
1980 /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1981 ///
1982 /// At the moment, this is only true if it projects away all columns and applies no filters,
1983 /// but it could be extended to plans that produce literals independent of the input.
1984 pub fn ignores_input(&self) -> bool {
1985 self.lower_bounds.is_empty()
1986 && self.upper_bounds.is_empty()
1987 && self.mfp.mfp.projection.is_empty()
1988 && self.mfp.mfp.predicates.is_empty()
1989 }
1990 }
1991}