mz_expr/linear.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::scalar::optimizable::OptimizableExpr;
16use crate::visit::Visit;
17use crate::{MirRelationExpr, MirScalarExpr};
18
19/// A compound operator that can be applied row-by-row.
20///
21/// This operator integrates the map, filter, and project operators.
22/// It applies a sequences of map expressions, which are allowed to
23/// refer to previous expressions, interleaved with predicates which
24/// must be satisfied for an output to be produced. If all predicates
25/// evaluate to `Datum::True` the data at the identified columns are
26/// collected and produced as output in a packed `Row`.
27///
28/// This operator is a "builder" and its contents may contain expressions
29/// that are not yet executable. For example, it may contain temporal
30/// expressions in `self.expressions`, even though this is not something
31/// we can directly evaluate. The plan creation methods will defensively
32/// ensure that the right thing happens.
33#[derive(
34 Clone,
35 Debug,
36 Eq,
37 PartialEq,
38 Serialize,
39 Deserialize,
40 Hash,
41 Ord,
42 PartialOrd
43)]
44#[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
45pub struct MapFilterProject<E: OptimizableExpr = MirScalarExpr> {
46 /// A sequence of expressions that should be appended to the row.
47 ///
48 /// Many of these expressions may not be produced in the output,
49 /// and may only be present as common subexpressions.
50 pub expressions: Vec<E>,
51 /// Expressions that must evaluate to `Datum::True` for the output
52 /// row to be produced.
53 ///
54 /// Each entry is prepended with a column identifier indicating
55 /// the column *before* which the predicate should first be applied.
56 /// Most commonly this would be one plus the largest column identifier
57 /// in the predicate's support, but it could be larger to implement
58 /// guarded evaluation of predicates.
59 ///
60 /// This list should be sorted by the first field.
61 pub predicates: Vec<(usize, E)>,
62 /// A sequence of column identifiers whose data form the output row.
63 pub projection: Vec<usize>,
64 /// The expected number of input columns.
65 ///
66 /// This is needed to ensure correct identification of newly formed
67 /// columns in the output.
68 pub input_arity: usize,
69}
70
71impl<E: OptimizableExpr + Display> Display for MapFilterProject<E> {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 writeln!(f, "MapFilterProject(")?;
74 writeln!(f, " expressions:")?;
75 self.expressions
76 .iter()
77 .enumerate()
78 .try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
79 writeln!(f, " predicates:")?;
80 self.predicates
81 .iter()
82 .try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
83 writeln!(f, " projection: {:?}", self.projection)?;
84 writeln!(f, " input_arity: {}", self.input_arity)?;
85 writeln!(f, ")")
86 }
87}
88
89impl<E: OptimizableExpr> MapFilterProject<E> {
90 /// Create a no-op operator for an input of a supplied arity.
91 pub fn new(input_arity: usize) -> Self {
92 Self {
93 expressions: Vec::new(),
94 predicates: Vec::new(),
95 projection: (0..input_arity).collect(),
96 input_arity,
97 }
98 }
99
100 /// Given two mfps, return an mfp that applies one
101 /// followed by the other.
102 /// Note that the arguments are in the opposite order
103 /// from how function composition is usually written in mathematics.
104 pub fn compose(before: Self, after: Self) -> Self {
105 let (m, f, p) = after.into_map_filter_project();
106 before.map(m).filter(f).project(p)
107 }
108
109 /// True if the operator describes the identity transformation.
110 pub fn is_identity(&self) -> bool {
111 self.expressions.is_empty()
112 && self.predicates.is_empty()
113 && self.projection.len() == self.input_arity
114 && self.projection.iter().enumerate().all(|(i, p)| i == *p)
115 }
116
117 /// Retain only the indicated columns in the presented order.
118 pub fn project<I>(mut self, columns: I) -> Self
119 where
120 I: IntoIterator<Item = usize> + std::fmt::Debug,
121 {
122 self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
123 self
124 }
125
126 /// Retain only rows satisfying these predicates.
127 ///
128 /// This method introduces predicates as eagerly as they can be evaluated,
129 /// which may not be desired for predicates that may cause exceptions.
130 /// If fine manipulation is required, the predicates can be added manually.
131 pub fn filter<I>(mut self, predicates: I) -> Self
132 where
133 I: IntoIterator<Item = E>,
134 {
135 for mut predicate in predicates {
136 // Correct column references.
137 predicate.permute(&self.projection[..]);
138
139 // Validate column references.
140 assert!(
141 predicate
142 .support()
143 .into_iter()
144 .all(|c| c < self.input_arity + self.expressions.len())
145 );
146
147 // Insert predicate as eagerly as it can be evaluated:
148 // just after the largest column in its support is formed.
149 let max_support = predicate
150 .support()
151 .into_iter()
152 .max()
153 .map(|c| c + 1)
154 .unwrap_or(0);
155 self.predicates.push((max_support, predicate))
156 }
157 // Stable sort predicates by position at which they take effect.
158 // We put literal errors at the end as a stop-gap to avoid erroring
159 // before we are able to evaluate any predicates that might prevent it.
160 self.predicates
161 .sort_by_key(|(position, predicate)| (E::is_literal_err(predicate), *position));
162 self
163 }
164
165 /// Append the result of evaluating expressions to each row.
166 pub fn map<I>(mut self, expressions: I) -> Self
167 where
168 I: IntoIterator<Item = E>,
169 {
170 for mut expression in expressions {
171 // Correct column references.
172 expression.permute(&self.projection[..]);
173
174 // Validate column references.
175 assert!(
176 expression
177 .support()
178 .into_iter()
179 .all(|c| c < self.input_arity + self.expressions.len())
180 );
181
182 // Introduce expression and produce as output.
183 self.expressions.push(expression);
184 self.projection
185 .push(self.input_arity + self.expressions.len() - 1);
186 }
187
188 self
189 }
190
191 /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
192 pub fn into_map_filter_project(self) -> (Vec<E>, Vec<E>, Vec<usize>) {
193 let predicates = self
194 .predicates
195 .into_iter()
196 .map(|(_pos, predicate)| predicate)
197 .collect();
198 (self.expressions, predicates, self.projection)
199 }
200
201 /// As the arguments to `Map`, `Filter`, and `Project` operators.
202 ///
203 /// In principle, this operator can be implemented as a sequence of
204 /// more elemental operators, likely less efficiently.
205 pub fn as_map_filter_project(&self) -> (Vec<E>, Vec<E>, Vec<usize>) {
206 self.clone().into_map_filter_project()
207 }
208}
209
210// Methods that are specific to MirScalarExpr (use MirRelationExpr or as_literal).
211impl MapFilterProject<MirScalarExpr> {
212 /// Determines if a scalar expression must be equal to a literal datum.
213 pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
214 for (_pos, predicate) in self.predicates.iter() {
215 if let MirScalarExpr::CallBinary {
216 func: crate::BinaryFunc::Eq(_),
217 expr1,
218 expr2,
219 } = predicate
220 {
221 if let Some(Ok(datum1)) = expr1.as_literal() {
222 if &**expr2 == expr {
223 return Some(datum1);
224 }
225 }
226 if let Some(Ok(datum2)) = expr2.as_literal() {
227 if &**expr1 == expr {
228 return Some(datum2);
229 }
230 }
231 }
232 }
233 None
234 }
235
236 /// Determines if a sequence of scalar expressions must be equal to a literal row.
237 ///
238 /// This method returns `None` on an empty `exprs`, which might be surprising, but
239 /// seems to line up with its callers' expectations of that being a non-constraint.
240 /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
241 /// if they would rather have a literal empty row.
242 pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
243 if exprs.is_empty() {
244 return None;
245 }
246 let mut row = Row::default();
247 let mut packer = row.packer();
248 for expr in exprs {
249 if let Some(literal) = self.literal_constraint(expr) {
250 packer.push(literal);
251 } else {
252 return None;
253 }
254 }
255 Some(row)
256 }
257
258 /// Extracts any MapFilterProject at the root of the expression.
259 ///
260 /// The expression will be modified to extract any maps, filters, and
261 /// projections, which will be returned as `Self`. If there are no maps,
262 /// filters, or projections the method will return an identity operator.
263 ///
264 /// The extracted expressions may contain temporal predicates, and one
265 /// should be careful to apply them blindly.
266 pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
267 // TODO: This could become iterative rather than recursive if
268 // we were able to fuse MFP operators from below, rather than
269 // from above.
270 match expr {
271 MirRelationExpr::Map { input, scalars } => {
272 let (mfp, expr) = Self::extract_from_expression(input);
273 (mfp.map(scalars.iter().cloned()), expr)
274 }
275 MirRelationExpr::Filter { input, predicates } => {
276 let (mfp, expr) = Self::extract_from_expression(input);
277 (mfp.filter(predicates.iter().cloned()), expr)
278 }
279 MirRelationExpr::Project { input, outputs } => {
280 let (mfp, expr) = Self::extract_from_expression(input);
281 (mfp.project(outputs.iter().cloned()), expr)
282 }
283 // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
284 // this call to `arity()`.
285 x => (Self::new(x.arity()), x),
286 }
287 }
288
289 /// Extracts an error-free MapFilterProject at the root of the expression.
290 ///
291 /// The expression will be modified to extract maps, filters, and projects
292 /// from the root of the expression, which will be returned as `Self`. The
293 /// extraction will halt if a Map or Filter containing a literal error is
294 /// reached. Otherwise, the method will return an identity operator.
295 ///
296 /// This method is meant to be used during optimization, where it is
297 /// necessary to avoid moving around maps and filters with errors.
298 pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
299 match expr {
300 MirRelationExpr::Map { input, scalars }
301 if scalars.iter().all(|s| !s.is_literal_err()) =>
302 {
303 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
304 (mfp.map(scalars.iter().cloned()), expr)
305 }
306 MirRelationExpr::Filter { input, predicates }
307 if predicates.iter().all(|p| !p.is_literal_err()) =>
308 {
309 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
310 (mfp.filter(predicates.iter().cloned()), expr)
311 }
312 MirRelationExpr::Project { input, outputs } => {
313 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
314 (mfp.project(outputs.iter().cloned()), expr)
315 }
316 x => (Self::new(x.arity()), x),
317 }
318 }
319
320 /// Extracts an error-free MapFilterProject at the root of the expression.
321 ///
322 /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
323 /// mutable reference.
324 pub fn extract_non_errors_from_expr_ref_mut(
325 expr: &mut MirRelationExpr,
326 ) -> (Self, &mut MirRelationExpr) {
327 // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
328 // superfluous outer if, which works around a borrow-checker issue:
329 // https://github.com/rust-lang/rust/issues/54663
330 if matches!(
331 expr,
332 MirRelationExpr::Map { input: _, scalars }
333 if scalars.iter().all(|s| !s.is_literal_err())
334 ) || matches!(
335 expr,
336 MirRelationExpr::Filter { input: _, predicates }
337 if predicates.iter().all(|p| !p.is_literal_err())
338 ) || matches!(expr, MirRelationExpr::Project { .. })
339 {
340 match expr {
341 MirRelationExpr::Map { input, scalars }
342 if scalars.iter().all(|s| !s.is_literal_err()) =>
343 {
344 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
345 (mfp.map(scalars.iter().cloned()), expr)
346 }
347 MirRelationExpr::Filter { input, predicates }
348 if predicates.iter().all(|p| !p.is_literal_err()) =>
349 {
350 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
351 (mfp.filter(predicates.iter().cloned()), expr)
352 }
353 MirRelationExpr::Project { input, outputs } => {
354 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
355 (mfp.project(outputs.iter().cloned()), expr)
356 }
357 _ => unreachable!(),
358 }
359 } else {
360 (Self::new(expr.arity()), expr)
361 }
362 }
363
364 /// Removes an error-free MapFilterProject from the root of the expression.
365 ///
366 /// The expression will be modified to extract maps, filters, and projects
367 /// from the root of the expression, which will be returned as `Self`. The
368 /// extraction will halt if a Map or Filter containing a literal error is
369 /// reached. Otherwise, the method will return an
370 /// identity operator, and the expression will remain unchanged.
371 ///
372 /// This method is meant to be used during optimization, where it is
373 /// necessary to avoid moving around maps and filters with errors.
374 pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
375 match expr {
376 MirRelationExpr::Map { input, scalars }
377 if scalars.iter().all(|s| !s.is_literal_err()) =>
378 {
379 let mfp =
380 Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
381 *expr = input.take_dangerous();
382 mfp
383 }
384 MirRelationExpr::Filter { input, predicates }
385 if predicates.iter().all(|p| !p.is_literal_err()) =>
386 {
387 let mfp = Self::extract_non_errors_from_expr_mut(input)
388 .filter(predicates.iter().cloned());
389 *expr = input.take_dangerous();
390 mfp
391 }
392 MirRelationExpr::Project { input, outputs } => {
393 let mfp =
394 Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
395 *expr = input.take_dangerous();
396 mfp
397 }
398 x => Self::new(x.arity()),
399 }
400 }
401}
402
403impl<E: OptimizableExpr> MapFilterProject<E> {
404 /// Returns `true` if any predicate in this MFP contains a temporal expression (`mz_now()`).
405 pub fn has_temporal_predicates(&self) -> bool {
406 self.predicates
407 .iter()
408 .any(|(_, predicate)| OptimizableExpr::contains_temporal(predicate))
409 }
410
411 /// Extracts temporal predicates into their own `Self`.
412 ///
413 /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
414 /// though there could be justification for extracting them as well if they are otherwise
415 /// unused.
416 ///
417 /// This separation is valuable when the execution cannot be fused into one operator.
418 pub fn extract_temporal(&mut self) -> Self {
419 // Optimize the expression, as it is only post-optimization that we can be certain
420 // that temporal expressions are restricted to filters. We could relax this in the
421 // future to be only `inline_expressions` and `remove_undemanded`, but optimization
422 // seems to be the best fit at the moment.
423 self.optimize();
424
425 // Assert that we no longer have temporal expressions to evaluate. This should only
426 // occur if the optimization above results with temporal expressions yielded in the
427 // output, which is out of spec for how the type is meant to be used.
428 assert!(
429 !self
430 .expressions
431 .iter()
432 .any(|e| OptimizableExpr::contains_temporal(e))
433 );
434
435 // Extract temporal predicates from `self.predicates`.
436 let mut temporal_predicates = Vec::new();
437 self.predicates.retain(|(_position, predicate)| {
438 if OptimizableExpr::contains_temporal(predicate) {
439 temporal_predicates.push(predicate.clone());
440 false
441 } else {
442 true
443 }
444 });
445
446 // Determine extended input columns used by temporal filters.
447 let mut support = BTreeSet::new();
448 for predicate in temporal_predicates.iter() {
449 support.extend(predicate.support());
450 }
451
452 // Discover the locations of these columns after `self.projection`.
453 let old_projection_len = self.projection.len();
454 let mut new_location = BTreeMap::new();
455 for original in support.iter() {
456 if let Some(position) = self.projection.iter().position(|x| x == original) {
457 new_location.insert(*original, position);
458 } else {
459 new_location.insert(*original, self.projection.len());
460 self.projection.push(*original);
461 }
462 }
463 // Permute references in extracted predicates to their new locations.
464 for predicate in temporal_predicates.iter_mut() {
465 predicate.permute_map(&new_location);
466 }
467
468 // Form a new `Self` containing the temporal predicates to return.
469 Self::new(self.projection.len())
470 .filter(temporal_predicates)
471 .project(0..old_projection_len)
472 }
473
474 /// Extracts common expressions from multiple `Self` into a result `Self`.
475 ///
476 /// The argument `mfps` are mutated so that each are functionaly equivalent to their
477 /// corresponding input, when composed atop the resulting `Self`.
478 ///
479 /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
480 pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
481 match mfps.len() {
482 0 => {
483 panic!("Cannot call method on empty arguments");
484 }
485 1 => {
486 let output_arity = mfps[0].projection.len();
487 std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
488 }
489 _ => {
490 // More generally, we convert each mfp to ANF, at which point we can
491 // repeatedly extract atomic expressions that depend only on input
492 // columns, migrate them to an input mfp, and repeat until no such
493 // expressions exist. At this point, we can also migrate predicates
494 // and then determine and push down projections.
495
496 // Prepare a return `Self`.
497 let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
498
499 // We convert each mfp to ANF, using `memoize_expressions`.
500 for mfp in mfps.iter_mut() {
501 mfp.memoize_expressions();
502 }
503
504 // We repeatedly extract common expressions, until none remain.
505 let mut done = false;
506 while !done {
507 // We use references to determine common expressions, and must
508 // introduce a scope here to drop the borrows before mutation.
509 let common = {
510 // The input arity may increase as we iterate, so recapture.
511 let input_arity = result_mfp.projection.len();
512 let mut prev: BTreeSet<_> = mfps[0]
513 .expressions
514 .iter()
515 .filter(|e| e.support().last() < Some(&input_arity))
516 .collect();
517 let mut next = BTreeSet::default();
518 for mfp in mfps[1..].iter() {
519 for expr in mfp.expressions.iter() {
520 if prev.contains(expr) {
521 next.insert(expr);
522 }
523 }
524 std::mem::swap(&mut prev, &mut next);
525 next.clear();
526 }
527 prev.into_iter().cloned().collect::<Vec<_>>()
528 };
529 // Without new common expressions, we should terminate the loop.
530 done = common.is_empty();
531
532 // Migrate each expression in `common` to `result_mfp`.
533 for expr in common.into_iter() {
534 // Update each mfp by removing expr and updating column references.
535 for mfp in mfps.iter_mut() {
536 // With `expr` next in `result_mfp`, it is as if we are rotating it to
537 // be the first expression in `mfp`, and then removing it from `mfp` and
538 // increasing the input arity of `mfp`.
539 let arity = result_mfp.projection.len();
540 let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
541 let index = arity + found;
542 // Column references change due to the rotation from `index` to `arity`.
543 let action = |c: &mut usize| {
544 if arity <= *c && *c < index {
545 *c += 1;
546 } else if *c == index {
547 *c = arity;
548 }
549 };
550 // Rotate `expr` from `found` to first, and then snip.
551 // Short circuit by simply removing and incrementing the input arity.
552 mfp.input_arity += 1;
553 mfp.expressions.remove(found);
554 // Update column references in expressions, predicates, and projections.
555 for e in mfp.expressions.iter_mut() {
556 e.visit_columns(action);
557 }
558 for (o, e) in mfp.predicates.iter_mut() {
559 e.visit_columns(action);
560 // Max out the offset for the predicate; optimization will correct.
561 *o = mfp.input_arity + mfp.expressions.len();
562 }
563 for c in mfp.projection.iter_mut() {
564 action(c);
565 }
566 }
567 // Install the expression and update
568 result_mfp.expressions.push(expr);
569 result_mfp.projection.push(result_mfp.projection.len());
570 }
571 }
572 // As before, but easier: predicates in common to all mfps.
573 let common_preds: Vec<E> = {
574 let input_arity = result_mfp.projection.len();
575 let mut prev: BTreeSet<_> = mfps[0]
576 .predicates
577 .iter()
578 .map(|(_, e)| e)
579 .filter(|e| e.support().last() < Some(&input_arity))
580 .collect();
581 let mut next = BTreeSet::default();
582 for mfp in mfps[1..].iter() {
583 for (_, expr) in mfp.predicates.iter() {
584 if prev.contains(expr) {
585 next.insert(expr);
586 }
587 }
588 std::mem::swap(&mut prev, &mut next);
589 next.clear();
590 }
591 // Expressions in common, that we will append to `result_mfp.expressions`.
592 prev.into_iter().cloned().collect::<Vec<_>>()
593 };
594 for mfp in mfps.iter_mut() {
595 mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
596 mfp.optimize();
597 }
598 result_mfp.predicates.extend(
599 common_preds
600 .into_iter()
601 .map(|e| (result_mfp.projection.len(), e)),
602 );
603
604 // Then, look for unused columns and project them away.
605 let mut common_demand = BTreeSet::new();
606 for mfp in mfps.iter() {
607 common_demand.extend(mfp.demand());
608 }
609 // columns in `common_demand` must be retained, but others
610 // may be discarded.
611 let common_demand = (0..result_mfp.projection.len())
612 .filter(|x| common_demand.contains(x))
613 .collect::<Vec<_>>();
614 let remap = common_demand
615 .iter()
616 .cloned()
617 .enumerate()
618 .map(|(new, old)| (old, new))
619 .collect::<BTreeMap<_, _>>();
620 for mfp in mfps.iter_mut() {
621 mfp.permute_fn(|c| remap[&c], common_demand.len());
622 }
623 result_mfp = result_mfp.project(common_demand);
624
625 // Return the resulting MFP.
626 result_mfp.optimize();
627 result_mfp
628 }
629 }
630 }
631
632 /// Returns `self`, and leaves behind an identity operator that acts on its output.
633 pub fn take(&mut self) -> Self {
634 let mut identity = Self::new(self.projection.len());
635 std::mem::swap(self, &mut identity);
636 identity
637 }
638
639 /// Convert the `MapFilterProject` into a staged evaluation plan.
640 ///
641 /// The main behavior is extract temporal predicates, which cannot be evaluated
642 /// using the standard machinery.
643 pub fn into_plan(self) -> Result<plan::MfpPlan<E>, String> {
644 plan::MfpPlan::create_from(self)
645 }
646}
647
648impl<E: OptimizableExpr> MapFilterProject<E> {
649 /// Partitions `self` into two instances, one of which can be eagerly applied.
650 ///
651 /// The `available` argument indicates which input columns are available (keys)
652 /// and in which positions (values). This information may allow some maps and
653 /// filters to execute. The `input_arity` argument reports the total number of
654 /// input columns (which may include some not present in `available`)
655 ///
656 /// This method partitions `self` in two parts, `(before, after)`, where `before`
657 /// can be applied on columns present as keys in `available`, and `after` must
658 /// await the introduction of the other input columns.
659 ///
660 /// The `before` instance will *append* any columns that can be determined from
661 /// `available` but will project away any of these columns that are not needed by
662 /// `after`. Importantly, this means that `before` will leave intact *all* input
663 /// columns including those not referenced in `available`.
664 ///
665 /// The `after` instance will presume all input columns are available, followed
666 /// by the appended columns of the `before` instance. It may be that some input
667 /// columns can be projected away in `before` if `after` does not need them, but
668 /// we leave that as something the caller can apply if needed (it is otherwise
669 /// complicated to negotiate which input columns `before` should retain).
670 ///
671 /// To correctly reconstruct `self` from `before` and `after`, one must introduce
672 /// additional input columns, permute all input columns to their locations as
673 /// expected by `self`, follow this by new columns appended by `before`, and
674 /// remove all other columns that may be present.
675 ///
676 /// # Example
677 ///
678 /// ```rust
679 /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
680 ///
681 /// // imagine an action on columns (a, b, c, d).
682 /// let original = MapFilterProject::new(4).map(vec![
683 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
684 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
685 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
686 /// ]).project(vec![6]);
687 ///
688 /// // Imagine we start with columns (b, x, a, y, c).
689 /// //
690 /// // The `partition` method requires a map from *expected* input columns to *actual*
691 /// // input columns. In the example above, the columns a, b, and c exist, and are at
692 /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
693 /// let mut available_columns = std::collections::BTreeMap::new();
694 /// available_columns.insert(0, 2);
695 /// available_columns.insert(1, 0);
696 /// available_columns.insert(2, 4);
697 /// // Partition `original` using the available columns and current input arity.
698 /// // This informs `partition` which columns are available, where they can be found,
699 /// // and how many columns are not relevant but should be preserved.
700 /// let (before, after) = original.partition(available_columns, 5);
701 ///
702 /// // `before` sees all five input columns, and should append `a + b + c`.
703 /// assert_eq!(before, MapFilterProject::new(5).map(vec![
704 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
705 /// MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
706 /// ]).project(vec![0, 1, 2, 3, 4, 6]));
707 ///
708 /// // `after` expects to see `(a, b, c, d, a + b + c)`.
709 /// assert_eq!(after, MapFilterProject::new(5).map(vec![
710 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
711 /// ]).project(vec![5]));
712 ///
713 /// // To reconstruct `self`, we must introduce the columns that are not present,
714 /// // and present them in the order intended by `self`. In this example, we must
715 /// // introduce column d and permute the columns so that they begin (a, b, c, d).
716 /// // The columns x and y must be projected away, and any columns introduced by
717 /// // `begin` must be retained in their current order.
718 ///
719 /// // The `after` instance expects to be provided with all inputs, but it
720 /// // may not need all inputs. The `demand()` and `permute()` methods can
721 /// // optimize the representation.
722 /// ```
723 pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
724 // Map expressions, filter predicates, and projections for `before` and `after`.
725 let mut before_expr = Vec::new();
726 let mut before_pred = Vec::new();
727 let mut before_proj = Vec::new();
728 let mut after_expr = Vec::new();
729 let mut after_pred = Vec::new();
730 let mut after_proj = Vec::new();
731
732 // Track which output columns must be preserved in the output of `before`.
733 let mut demanded = BTreeSet::new();
734 demanded.extend(0..self.input_arity);
735 demanded.extend(self.projection.iter());
736
737 // Determine which map expressions can be computed from the available subset.
738 // Some expressions may depend on other expressions, but by evaluating them
739 // in forward order we should accurately determine the available expressions.
740 let mut available_expr = vec![false; self.input_arity];
741 // Initialize available columns from `available`, which is then not used again.
742 for index in available.keys() {
743 available_expr[*index] = true;
744 }
745 for expr in self.expressions.into_iter() {
746 // We treat an expression as available if its supporting columns are available,
747 // and if it is not a literal (we want to avoid pushing down literals). This
748 // choice is ad-hoc, but the intent is that we partition the operators so
749 // that we can reduce the row representation size and total computation.
750 // Pushing down literals harms the former and does nothing for the latter.
751 // In the future, we'll want to have a harder think about this trade-off, as
752 // we are certainly making sub-optimal decisions by pushing down all available
753 // work.
754 // TODO(mcsherry): establish better principles about what work to push down.
755 let is_available = expr.support().into_iter().all(|i| available_expr[i])
756 && !OptimizableExpr::is_literal(&expr);
757 if is_available {
758 before_expr.push(expr);
759 } else {
760 demanded.extend(expr.support());
761 after_expr.push(expr);
762 }
763 available_expr.push(is_available);
764 }
765
766 // Determine which predicates can be computed from the available subset.
767 for (_when, pred) in self.predicates.into_iter() {
768 let is_available = pred.support().into_iter().all(|i| available_expr[i]);
769 if is_available {
770 before_pred.push(pred);
771 } else {
772 demanded.extend(pred.support());
773 after_pred.push(pred);
774 }
775 }
776
777 // Map from prior output location to location in un-projected `before`.
778 // This map is used to correct references in `before` but it should be
779 // adjusted to reflect `before`s projection prior to use in `after`.
780 let mut before_map = available;
781 // Input columns include any additional undescribed columns that may
782 // not be captured by the `available` argument, so we must independently
783 // track the current number of columns (vs relying on `before_map.len()`).
784 let mut input_columns = input_arity;
785 for index in self.input_arity..available_expr.len() {
786 if available_expr[index] {
787 before_map.insert(index, input_columns);
788 input_columns += 1;
789 }
790 }
791
792 // Permute the column references in `before` expressions and predicates.
793 for expr in before_expr.iter_mut() {
794 expr.permute_map(&before_map);
795 }
796 for pred in before_pred.iter_mut() {
797 pred.permute_map(&before_map);
798 }
799
800 // Demand information determines `before`s output projection.
801 // Specifically, we produce all input columns in the output, as well as
802 // any columns that are available and demanded.
803 before_proj.extend(0..input_arity);
804 for index in self.input_arity..available_expr.len() {
805 // If an intermediate result is both available and demanded,
806 // we should produce it as output.
807 if available_expr[index] && demanded.contains(&index) {
808 // Use the new location of `index`.
809 before_proj.push(before_map[&index]);
810 }
811 }
812
813 // Map from prior output locations to location in post-`before` columns.
814 // This map is used to correct references in `after`.
815 // The presumption is that `after` will be presented with all input columns,
816 // followed by the output columns introduced by `before` in order.
817 let mut after_map = BTreeMap::new();
818 for index in 0..self.input_arity {
819 after_map.insert(index, index);
820 }
821 for index in self.input_arity..available_expr.len() {
822 // If an intermediate result is both available and demanded,
823 // it was produced as output.
824 if available_expr[index] && demanded.contains(&index) {
825 // We expect to find the output as far after `self.input_arity` as
826 // it was produced after `input_arity` in the output of `before`.
827 let location = self.input_arity
828 + (before_proj
829 .iter()
830 .position(|x| x == &before_map[&index])
831 .unwrap()
832 - input_arity);
833 after_map.insert(index, location);
834 }
835 }
836 // We must now re-map the remaining non-demanded expressions, which are
837 // contiguous rather than potentially interspersed.
838 for index in self.input_arity..available_expr.len() {
839 if !available_expr[index] {
840 after_map.insert(index, after_map.len());
841 }
842 }
843
844 // Permute the column references in `after` expressions and predicates.
845 for expr in after_expr.iter_mut() {
846 expr.permute_map(&after_map);
847 }
848 for pred in after_pred.iter_mut() {
849 pred.permute_map(&after_map);
850 }
851 // Populate `after` projection with the new locations of `self.projection`.
852 for index in self.projection {
853 after_proj.push(after_map[&index]);
854 }
855
856 // Form and return the before and after MapFilterProject instances.
857 let before = Self::new(input_arity)
858 .map(before_expr)
859 .filter(before_pred)
860 .project(before_proj.clone());
861 let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
862 .map(after_expr)
863 .filter(after_pred)
864 .project(after_proj);
865 (before, after)
866 }
867
868 /// Lists input columns whose values are used in outputs.
869 ///
870 /// You can use `BTreeSet::last()` to extract the maximum demanded column from the set.
871 ///
872 /// It is entirely appropriate to determine the demand of an instance
873 /// and then both apply a projection to the subject of the instance and
874 /// `self.permute` this instance.
875 pub fn demand(&self) -> BTreeSet<usize> {
876 let mut demanded = BTreeSet::new();
877 for (_index, pred) in self.predicates.iter() {
878 demanded.extend(pred.support());
879 }
880 demanded.extend(self.projection.iter().cloned());
881 for index in (0..self.expressions.len()).rev() {
882 if demanded.contains(&(self.input_arity + index)) {
883 demanded.extend(self.expressions[index].support());
884 }
885 }
886 demanded.retain(|col| col < &self.input_arity);
887 demanded
888 }
889
890 /// Update input column references, due to an input projection or permutation.
891 ///
892 /// The `shuffle` argument remaps expected column identifiers to new locations,
893 /// with the expectation that `shuffle` describes all input columns, and so the
894 /// intermediate results will be able to start at position `shuffle.len()`.
895 ///
896 /// The supplied `shuffle` might not list columns that are not "demanded" by the
897 /// instance, and so we should ensure that `self` is optimized to not reference
898 /// columns that are not demanded.
899 pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
900 where
901 F: Fn(usize) -> usize,
902 {
903 let (mut map, mut filter, mut project) = self.as_map_filter_project();
904 let map_len = map.len();
905 let action = |col: &mut usize| {
906 if self.input_arity <= *col && *col < self.input_arity + map_len {
907 *col = new_input_arity + (*col - self.input_arity);
908 } else {
909 *col = remap(*col);
910 }
911 };
912 for expr in map.iter_mut() {
913 expr.visit_columns(action);
914 }
915 for pred in filter.iter_mut() {
916 pred.visit_columns(action);
917 }
918 for proj in project.iter_mut() {
919 action(proj);
920 assert!(*proj < new_input_arity + map.len());
921 }
922 *self = Self::new(new_input_arity)
923 .map(map)
924 .filter(filter)
925 .project(project)
926 }
927}
928
929// Optimization routines.
930impl<E: OptimizableExpr> MapFilterProject<E> {
931 /// Optimize the internal expression evaluation order.
932 ///
933 /// This method performs several optimizations that are meant to streamline
934 /// the execution of the `MapFilterProject` instance, but not to alter its
935 /// semantics. This includes extracting expressions that are used multiple
936 /// times, inlining those that are not, and removing expressions that are
937 /// unreferenced.
938 ///
939 /// This method will inline all temporal expressions, and remove any columns
940 /// that are not demanded by the output, which should transform any temporal
941 /// filters to a state where the temporal expressions exist only in the list
942 /// of predicates.
943 ///
944 /// # Example
945 ///
946 /// This example demonstrates how the re-use of one expression, converting
947 /// column 1 from a string to an integer, can be extracted and the results
948 /// shared among the two uses. This example is used for each of the steps
949 /// along the optimization path.
950 ///
951 /// ```rust
952 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
953 /// // Demonstrate extraction of common expressions (here: parsing strings).
954 /// let mut map_filter_project = MapFilterProject::new(5)
955 /// .map(vec![
956 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
957 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
958 /// ])
959 /// .project(vec![3,4,5,6]);
960 ///
961 /// let mut expected_optimized = MapFilterProject::new(5)
962 /// .map(vec![
963 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
964 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
965 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
966 /// ])
967 /// .project(vec![3,4,6,7]);
968 ///
969 /// // Optimize the expression.
970 /// map_filter_project.optimize();
971 ///
972 /// assert_eq!(
973 /// map_filter_project,
974 /// expected_optimized,
975 /// );
976 /// ```
977 pub fn optimize(&mut self) {
978 // Track sizes and iterate as long as they decrease.
979 let mut prev_size = None;
980 let mut self_size = usize::max_value();
981 // Continue as long as strict improvements occur.
982 while prev_size.map(|p| self_size < p).unwrap_or(true) {
983 // Lock in current size.
984 prev_size = Some(self_size);
985
986 // We have an annoying pattern of mapping literals that already exist as columns (by filters).
987 // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
988 // and then replace the mapped expression by a column reference.
989 //
990 // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
991 // the first place. The tell-tale that we see when we fix is a diff that look likes
992 //
993 // - Project (#0, #2)
994 // - Filter (#1 = 1)
995 // - Map (1)
996 // - Get l0
997 // + Filter (#1 = 1)
998 // + Get l0
999 //
1000 for (index, expr) in self.expressions.iter_mut().enumerate() {
1001 // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
1002 for (_, predicate) in self.predicates.iter() {
1003 if let Some(col) =
1004 E::equality_column_alias(predicate, expr, index + self.input_arity)
1005 {
1006 *expr = col;
1007 }
1008 }
1009 }
1010
1011 // Optimization memoizes individual `ScalarExpr` expressions that
1012 // are sure to be evaluated, canonicalizes references to the first
1013 // occurrence of each, inlines expressions that have a reference
1014 // count of one, and then removes any expressions that are not
1015 // referenced.
1016 self.memoize_expressions();
1017 self.predicates.sort();
1018 self.predicates.dedup();
1019 self.inline_expressions();
1020 self.remove_undemanded();
1021
1022 // Re-build `self` from parts to restore evaluation order invariants.
1023 let (map, filter, project) = self.as_map_filter_project();
1024 *self = Self::new(self.input_arity)
1025 .map(map)
1026 .filter(filter)
1027 .project(project);
1028
1029 self_size = self.size();
1030 }
1031 }
1032
1033 /// Total expression sizes across all expressions.
1034 pub fn size(&self) -> usize {
1035 self.expressions
1036 .iter()
1037 .map(|e| OptimizableExpr::size(e))
1038 .sum::<usize>()
1039 + self
1040 .predicates
1041 .iter()
1042 .map(|(_, e)| OptimizableExpr::size(e))
1043 .sum::<usize>()
1044 }
1045
1046 /// Place each certainly evaluated expression in its own column.
1047 ///
1048 /// This method places each non-trivial, certainly evaluated expression
1049 /// in its own column, and deduplicates them so that all references to
1050 /// the same expression reference the same column.
1051 ///
1052 /// This transformation is restricted to expressions we are certain will
1053 /// be evaluated, which does not include expressions in `if` statements.
1054 ///
1055 /// # Example
1056 ///
1057 /// This example demonstrates how memoization notices `MirScalarExpr`s
1058 /// that are used multiple times, and ensures that each are extracted
1059 /// into columns and then referenced by column. This pass does not try
1060 /// to minimize the occurrences of column references, which will happen
1061 /// in inlining.
1062 ///
1063 /// ```rust
1064 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1065 /// // Demonstrate extraction of common expressions (here: parsing strings).
1066 /// let mut map_filter_project = MapFilterProject::new(5)
1067 /// .map(vec![
1068 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1069 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1070 /// ])
1071 /// .project(vec![3,4,5,6]);
1072 ///
1073 /// let mut expected_optimized = MapFilterProject::new(5)
1074 /// .map(vec![
1075 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1076 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1077 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1078 /// MirScalarExpr::column(7),
1079 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1080 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1081 /// MirScalarExpr::column(10),
1082 /// ])
1083 /// .project(vec![3,4,8,11]);
1084 ///
1085 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1086 /// map_filter_project.memoize_expressions();
1087 ///
1088 /// assert_eq!(
1089 /// map_filter_project,
1090 /// expected_optimized,
1091 /// );
1092 /// ```
1093 ///
1094 /// Expressions may not be memoized if they are not certain to be evaluated,
1095 /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1096 ///
1097 /// ```rust
1098 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1099 /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1100 /// // the non-extraction of common expressions guarded by conditions.
1101 /// let mut map_filter_project = MapFilterProject::new(2)
1102 /// .map(vec![
1103 /// MirScalarExpr::If {
1104 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1105 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1106 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1107 /// },
1108 /// MirScalarExpr::If {
1109 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1110 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1111 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1112 /// },
1113 /// ]);
1114 ///
1115 /// let mut expected_optimized = MapFilterProject::new(2)
1116 /// .map(vec![
1117 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1118 /// MirScalarExpr::If {
1119 /// cond: Box::new(MirScalarExpr::column(2)),
1120 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1121 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1122 /// },
1123 /// MirScalarExpr::column(3),
1124 /// MirScalarExpr::If {
1125 /// cond: Box::new(MirScalarExpr::column(2)),
1126 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1127 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1128 /// },
1129 /// MirScalarExpr::column(5),
1130 /// ])
1131 /// .project(vec![0,1,4,6]);
1132 ///
1133 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1134 /// map_filter_project.memoize_expressions();
1135 ///
1136 /// assert_eq!(
1137 /// map_filter_project,
1138 /// expected_optimized,
1139 /// );
1140 /// ```
1141 pub fn memoize_expressions(&mut self) {
1142 // Record the mapping from starting column references to new column
1143 // references.
1144 let mut remaps = BTreeMap::new();
1145 for index in 0..self.input_arity {
1146 remaps.insert(index, index);
1147 }
1148 let mut new_expressions = Vec::new();
1149
1150 // We follow the same order as for evaluation, to ensure that all
1151 // column references exist in time for their evaluation. We could
1152 // prioritize predicates, but we would need to be careful to chase
1153 // down column references to expressions and memoize those as well.
1154 let mut expression = 0;
1155 for (support, predicate) in self.predicates.iter_mut() {
1156 while self.input_arity + expression < *support {
1157 self.expressions[expression].permute_map(&remaps);
1158 memoize_expr(
1159 &mut self.expressions[expression],
1160 &mut new_expressions,
1161 self.input_arity,
1162 );
1163 remaps.insert(
1164 self.input_arity + expression,
1165 self.input_arity + new_expressions.len(),
1166 );
1167 new_expressions.push(self.expressions[expression].clone());
1168 expression += 1;
1169 }
1170 predicate.permute_map(&remaps);
1171 memoize_expr(predicate, &mut new_expressions, self.input_arity);
1172 }
1173 while expression < self.expressions.len() {
1174 self.expressions[expression].permute_map(&remaps);
1175 memoize_expr(
1176 &mut self.expressions[expression],
1177 &mut new_expressions,
1178 self.input_arity,
1179 );
1180 remaps.insert(
1181 self.input_arity + expression,
1182 self.input_arity + new_expressions.len(),
1183 );
1184 new_expressions.push(self.expressions[expression].clone());
1185 expression += 1;
1186 }
1187
1188 self.expressions = new_expressions;
1189 for proj in self.projection.iter_mut() {
1190 *proj = remaps[proj];
1191 }
1192
1193 // Restore predicate order invariants.
1194 for (pos, pred) in self.predicates.iter_mut() {
1195 *pos = pred.support().last().map(|x| *x + 1).unwrap_or(0);
1196 }
1197 }
1198
1199 /// This method inlines expressions with a single use.
1200 ///
1201 /// This method only inlines expressions; it does not delete expressions
1202 /// that are no longer referenced. The `remove_undemanded()` method does
1203 /// that, and should likely be used after this method.
1204 ///
1205 /// Inlining replaces column references when the referred-to item is either
1206 /// another column reference, or the only referrer of its referent. This
1207 /// is most common after memoization has atomized all expressions to seek
1208 /// out re-use: inlining re-assembles expressions that were not helpfully
1209 /// shared with other expressions.
1210 ///
1211 /// # Example
1212 ///
1213 /// In this example, we see that with only a single reference to columns
1214 /// 0 and 2, their parsing can each be inlined. Similarly, column references
1215 /// can be cleaned up among expressions, and in the final projection.
1216 ///
1217 /// Also notice the remaining expressions, which can be cleaned up in a later
1218 /// pass (the `remove_undemanded` method).
1219 ///
1220 /// ```rust
1221 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1222 /// // Use the output from first `memoize_expression` example.
1223 /// let mut map_filter_project = MapFilterProject::new(5)
1224 /// .map(vec![
1225 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1226 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1227 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1228 /// MirScalarExpr::column(7),
1229 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1230 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1231 /// MirScalarExpr::column(10),
1232 /// ])
1233 /// .project(vec![3,4,8,11]);
1234 ///
1235 /// let mut expected_optimized = MapFilterProject::new(5)
1236 /// .map(vec![
1237 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1238 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1239 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1240 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1241 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1242 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1243 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1244 /// ])
1245 /// .project(vec![3,4,8,11]);
1246 ///
1247 /// // Inline expressions that are referenced only once.
1248 /// map_filter_project.inline_expressions();
1249 ///
1250 /// assert_eq!(
1251 /// map_filter_project,
1252 /// expected_optimized,
1253 /// );
1254 /// ```
1255 pub fn inline_expressions(&mut self) {
1256 // Local copy of input_arity to avoid borrowing `self` in closures.
1257 let input_arity = self.input_arity;
1258 // Reference counts track the number of places that a reference occurs.
1259 let mut reference_count = vec![0; input_arity + self.expressions.len()];
1260 // Increment reference counts for each use
1261 for expr in self.expressions.iter() {
1262 expr.visit_pre(&mut |e| {
1263 if let Some(i) = e.as_column() {
1264 reference_count[i] += 1;
1265 }
1266 })
1267 .expect("visit_pre hit recursion limit");
1268 }
1269 for (_, pred) in self.predicates.iter() {
1270 pred.visit_pre(&mut |e| {
1271 if let Some(i) = e.as_column() {
1272 reference_count[i] += 1;
1273 }
1274 })
1275 .expect("visit_pre hit recursion limit");
1276 }
1277 for proj in self.projection.iter() {
1278 reference_count[*proj] += 1;
1279 }
1280
1281 // Determine which expressions should be inlined because they reference temporal expressions.
1282 let mut is_temporal = vec![false; input_arity];
1283 for expr in self.expressions.iter() {
1284 // An express may contain a temporal expression, or reference a column containing such.
1285 is_temporal.push(
1286 OptimizableExpr::contains_temporal(expr)
1287 || expr.support().into_iter().any(|col| is_temporal[col]),
1288 );
1289 }
1290
1291 // Inline only those columns that 1. are expressions not inputs, and
1292 // 2a. are column references or literals or 2b. have a refcount of 1,
1293 // or 2c. reference temporal expressions (which cannot be evaluated).
1294 let mut should_inline = vec![false; reference_count.len()];
1295 for i in (input_arity..reference_count.len()).rev() {
1296 if let Some(c) = self.expressions[i - input_arity].as_column() {
1297 should_inline[i] = true;
1298 // The reference count of the referenced column should be
1299 // incremented with the number of references
1300 // `self.expressions[i - input_arity]` has.
1301 // Subtract 1 because `self.expressions[i - input_arity]` is
1302 // itself a reference.
1303 reference_count[c] += reference_count[i] - 1;
1304 } else {
1305 should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1306 }
1307 }
1308 // Inline expressions per `should_inline`.
1309 self.perform_inlining(should_inline);
1310 // We can only inline column references in `self.projection`, but we should.
1311 for proj in self.projection.iter_mut() {
1312 if *proj >= self.input_arity {
1313 if let Some(i) = self.expressions[*proj - self.input_arity].as_column() {
1314 // TODO(mgree) !!! propagate name information to projection
1315 *proj = i;
1316 }
1317 }
1318 }
1319 }
1320
1321 /// Inlines those expressions that are indicated by should_inline.
1322 /// See `inline_expressions` for usage.
1323 pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1324 for index in 0..self.expressions.len() {
1325 let (prior, expr) = self.expressions.split_at_mut(index);
1326 expr[0]
1327 .visit_mut_post(&mut |e| {
1328 if let Some(i) = e.as_column() {
1329 if should_inline[i] {
1330 *e = prior[i - self.input_arity].clone();
1331 }
1332 }
1333 })
1334 .expect("inlining hit recursion limit");
1335 }
1336 for (_index, pred) in self.predicates.iter_mut() {
1337 let expressions = &self.expressions;
1338 pred.visit_mut_post(&mut |e| {
1339 if let Some(i) = e.as_column() {
1340 if should_inline[i] {
1341 *e = expressions[i - self.input_arity].clone();
1342 }
1343 }
1344 })
1345 .expect("inlining hit recursion limit");
1346 }
1347 }
1348
1349 /// Removes unused expressions from `self.expressions`.
1350 ///
1351 /// Expressions are "used" if they are relied upon by any output columns
1352 /// or any predicates, even transitively. Any expressions that are not
1353 /// relied upon in this way can be discarded.
1354 ///
1355 /// # Example
1356 ///
1357 /// ```rust
1358 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1359 /// // Use the output from `inline_expression` example.
1360 /// let mut map_filter_project = MapFilterProject::new(5)
1361 /// .map(vec![
1362 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1363 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1364 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1365 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1366 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1367 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1368 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1369 /// ])
1370 /// .project(vec![3,4,8,11]);
1371 ///
1372 /// let mut expected_optimized = MapFilterProject::new(5)
1373 /// .map(vec![
1374 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1375 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1376 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1377 /// ])
1378 /// .project(vec![3,4,6,7]);
1379 ///
1380 /// // Remove undemanded expressions, streamlining the work done..
1381 /// map_filter_project.remove_undemanded();
1382 ///
1383 /// assert_eq!(
1384 /// map_filter_project,
1385 /// expected_optimized,
1386 /// );
1387 /// ```
1388 pub fn remove_undemanded(&mut self) {
1389 // Determine the demanded expressions to remove irrelevant ones.
1390 let mut demand = BTreeSet::new();
1391 for (_index, pred) in self.predicates.iter() {
1392 demand.extend(pred.support());
1393 }
1394 // Start from the output columns as presumed demanded.
1395 // If this is not the case, the caller should project some away.
1396 demand.extend(self.projection.iter().cloned());
1397 // Proceed in *reverse* order, as expressions may depend on other
1398 // expressions that precede them.
1399 for index in (0..self.expressions.len()).rev() {
1400 if demand.contains(&(self.input_arity + index)) {
1401 demand.extend(self.expressions[index].support());
1402 }
1403 }
1404
1405 // Maintain a map from initial column identifiers to locations
1406 // once we have removed undemanded expressions.
1407 let mut remap = BTreeMap::new();
1408 // This map only needs to map elements of `demand` to a new location,
1409 // but the logic is easier if we include all input columns (as the
1410 // new position is then determined by the size of the map).
1411 for index in 0..self.input_arity {
1412 remap.insert(index, index);
1413 }
1414 // Retain demanded expressions, and record their new locations.
1415 let mut new_expressions = Vec::new();
1416 for (index, expr) in self.expressions.drain(..).enumerate() {
1417 if demand.contains(&(index + self.input_arity)) {
1418 remap.insert(index + self.input_arity, remap.len());
1419 new_expressions.push(expr);
1420 }
1421 }
1422 self.expressions = new_expressions;
1423
1424 // Update column identifiers; rebuild `Self` to re-establish any invariants.
1425 // We mirror `self.permute(&remap)` but we specifically want to remap columns
1426 // that are produced by `self.expressions` after the input columns.
1427 let (expressions, predicates, projection) = self.as_map_filter_project();
1428 *self = Self::new(self.input_arity)
1429 .map(expressions.into_iter().map(|mut e| {
1430 e.permute_map(&remap);
1431 e
1432 }))
1433 .filter(predicates.into_iter().map(|mut p| {
1434 p.permute_map(&remap);
1435 p
1436 }))
1437 .project(projection.into_iter().map(|c| remap[&c]));
1438 }
1439}
1440
1441// TODO: move this elsewhere?
1442/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1443///
1444/// A part of `expr` that is memoized is replaced by a reference to column
1445/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1446/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1447/// refers to.
1448pub fn memoize_expr<E: OptimizableExpr>(
1449 expr: &mut E,
1450 memoized_parts: &mut Vec<E>,
1451 input_arity: usize,
1452) {
1453 #[allow(deprecated)]
1454 expr.visit_mut_pre_post(&mut |e| e.eager_children(), &mut |e| {
1455 if E::is_literal(e) {
1456 // Literals do not need to be memoized.
1457 return;
1458 }
1459 if let Some(col) = e.as_column_mut() {
1460 // Column references do not need to be memoized, but may need to be
1461 // updated if they reference a column reference themselves.
1462 if *col > input_arity {
1463 if let Some(col2) = memoized_parts[*col - input_arity].as_column() {
1464 // Update the column index in place, preserving any name information.
1465 *col = col2;
1466 }
1467 }
1468 return;
1469 }
1470 // TODO: OOO (Optimizer Optimization Opportunity):
1471 // we are quadratic in expression size because of this .iter().position
1472 if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1473 // Any complex expression that already exists as a prior column can
1474 // be replaced by a reference to that column.
1475 *e = E::column(input_arity + position);
1476 } else {
1477 // A complex expression that does not exist should be memoized, and
1478 // replaced by a reference to the column.
1479 memoized_parts.push(std::mem::replace(
1480 e,
1481 E::column(input_arity + memoized_parts.len()),
1482 ));
1483 }
1484 })
1485 .expect("memoize_expr hit recursion limit");
1486}
1487
1488pub mod util {
1489 use std::collections::BTreeMap;
1490
1491 use crate::MirScalarExpr;
1492 use crate::scalar::columns::Columns;
1493
1494 #[allow(dead_code)]
1495 /// A triple of actions that map from rows to (key, val) pairs and back again.
1496 struct KeyValRowMapping {
1497 /// Expressions to apply to a row to produce key datums.
1498 to_key: Vec<MirScalarExpr>,
1499 /// Columns to project from a row to produce residual value datums.
1500 to_val: Vec<usize>,
1501 /// Columns to project from the concatenation of key and value to reconstruct the row.
1502 to_row: Vec<usize>,
1503 }
1504
1505 /// Derive supporting logic to support transforming rows to (key, val) pairs,
1506 /// and back again.
1507 ///
1508 /// We are given as input a list of mappings from columns to key indices, a key length,
1509 /// and an input arity. (The produced key should be the application of the key expressions.)
1510 ///
1511 /// To produce the `val` output, we will identify those input columns not found in
1512 /// the key expressions, and name all other columns.
1513 /// To reconstitute the original row, we identify the sequence of columns from the
1514 /// concatenation of key and val which would reconstruct the original row.
1515 ///
1516 /// The output is a pair of column sequences, the first used to reconstruct a row
1517 /// from the concatenation of key and value, and the second to identify the columns
1518 /// of a row that should become the value associated with its key.
1519 ///
1520 /// The permutations and thinning expressions generated here will be tracked in
1521 /// `compute_types::plan::AvailableCollections`; see the
1522 /// documentation there for more details.
1523 pub fn permutation_for_arrangement(
1524 key: &[impl Columns],
1525 unthinned_arity: usize,
1526 ) -> (Vec<usize>, Vec<usize>) {
1527 let columns_in_key: BTreeMap<_, _> = key
1528 .iter()
1529 .enumerate()
1530 .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1531 .collect();
1532
1533 let mut input_cursor = key.len();
1534 let permutation = (0..unthinned_arity)
1535 .map(|c| {
1536 if let Some(c) = columns_in_key.get(&c) {
1537 // Column is in key (and thus gone from the value
1538 // of the thinned representation)
1539 *c
1540 } else {
1541 // Column remains in value of the thinned representation
1542 input_cursor += 1;
1543 input_cursor - 1
1544 }
1545 })
1546 .collect();
1547 let thinning = (0..unthinned_arity)
1548 .filter(|c| !columns_in_key.contains_key(c))
1549 .collect();
1550 (permutation, thinning)
1551 }
1552
1553 /// Given the permutations (see [`permutation_for_arrangement`] and
1554 /// (`dataflow::plan::AvailableCollections`) corresponding to two
1555 /// collections with the same key arity,
1556 /// computes the permutation for the result of joining them.
1557 pub fn join_permutations(
1558 key_arity: usize,
1559 stream_permutation: Vec<usize>,
1560 thinned_stream_arity: usize,
1561 lookup_permutation: Vec<usize>,
1562 ) -> BTreeMap<usize, usize> {
1563 let stream_arity = stream_permutation.len();
1564 let lookup_arity = lookup_permutation.len();
1565
1566 (0..stream_arity + lookup_arity)
1567 .map(|i| {
1568 let location = if i < stream_arity {
1569 stream_permutation[i]
1570 } else {
1571 let location_in_lookup = lookup_permutation[i - stream_arity];
1572 if location_in_lookup < key_arity {
1573 location_in_lookup
1574 } else {
1575 location_in_lookup + thinned_stream_arity
1576 }
1577 };
1578 (i, location)
1579 })
1580 .collect()
1581 }
1582}
1583
1584pub mod plan {
1585 use std::iter;
1586
1587 use mz_repr::{Datum, Diff, Row, RowArena};
1588 use serde::{Deserialize, Serialize};
1589
1590 use crate::Eval;
1591 use crate::scalar::optimizable::OptimizableExpr;
1592 use crate::{EvalError, MapFilterProject, MirScalarExpr};
1593
1594 /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1595 #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1596 #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1597 pub struct SafeMfpPlan<E: OptimizableExpr = MirScalarExpr> {
1598 pub(crate) mfp: MapFilterProject<E>,
1599 }
1600
1601 impl<E: OptimizableExpr> SafeMfpPlan<E> {
1602 /// Wrap a `MapFilterProject` in a `SafeMfpPlan`.
1603 pub fn from_mfp(mfp: MapFilterProject<E>) -> Self {
1604 Self { mfp }
1605 }
1606
1607 /// Unwrap the inner `MapFilterProject`.
1608 pub fn into_mfp(self) -> MapFilterProject<E> {
1609 self.mfp
1610 }
1611
1612 /// Remaps references to input columns according to `remap`.
1613 ///
1614 /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1615 pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1616 where
1617 F: Fn(usize) -> usize,
1618 {
1619 self.mfp.permute_fn(remap, new_arity);
1620 }
1621
1622 /// Returns true when `Self` is the identity.
1623 pub fn is_identity(&self) -> bool {
1624 self.mfp.is_identity()
1625 }
1626 }
1627
1628 impl<E: OptimizableExpr + Eval> SafeMfpPlan<E> {
1629 /// Evaluates the linear operator on a supplied list of datums.
1630 ///
1631 /// The arguments are the initial datums associated with the row,
1632 /// and an appropriately lifetimed arena for temporary allocations
1633 /// needed by scalar evaluation.
1634 ///
1635 /// An `Ok` result will either be `None` if any predicate did not
1636 /// evaluate to `Datum::True`, or the values of the columns listed
1637 /// by `self.projection` if all predicates passed. If an error
1638 /// occurs in the evaluation it is returned as an `Err` variant.
1639 /// As the evaluation exits early with failed predicates, it may
1640 /// miss some errors that would occur later in evaluation.
1641 ///
1642 /// The `row` is not cleared first, but emptied if the function
1643 /// returns `Ok(Some(row)).
1644 #[inline(always)]
1645 pub fn evaluate_into<'a, 'row>(
1646 &'a self,
1647 datums: &mut Vec<Datum<'a>>,
1648 arena: &'a RowArena,
1649 row_buf: &'row mut Row,
1650 ) -> Result<Option<&'row Row>, EvalError> {
1651 let passed_predicates = self.evaluate_inner(datums, arena)?;
1652 if !passed_predicates {
1653 Ok(None)
1654 } else {
1655 row_buf
1656 .packer()
1657 .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1658 Ok(Some(row_buf))
1659 }
1660 }
1661
1662 /// A version of `evaluate` which produces an iterator over `Datum`
1663 /// as output.
1664 ///
1665 /// This version can be useful when one wants to capture the resulting
1666 /// datums without packing and then unpacking a row.
1667 #[inline(always)]
1668 pub fn evaluate_iter<'b, 'a: 'b>(
1669 &'a self,
1670 datums: &'b mut Vec<Datum<'a>>,
1671 arena: &'a RowArena,
1672 ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1673 let passed_predicates = self.evaluate_inner(datums, arena)?;
1674 if !passed_predicates {
1675 Ok(None)
1676 } else {
1677 Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1678 }
1679 }
1680
1681 /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1682 ///
1683 /// This does not apply `self.projection`, which is up to the calling method.
1684 pub fn evaluate_inner<'b, 'a: 'b>(
1685 &'a self,
1686 datums: &'b mut Vec<Datum<'a>>,
1687 arena: &'a RowArena,
1688 ) -> Result<bool, EvalError> {
1689 let mut expression = 0;
1690 for (support, predicate) in self.mfp.predicates.iter() {
1691 while self.mfp.input_arity + expression < *support {
1692 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1693 expression += 1;
1694 }
1695 if predicate.eval(&datums[..], arena)? != Datum::True {
1696 return Ok(false);
1697 }
1698 }
1699 while expression < self.mfp.expressions.len() {
1700 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1701 expression += 1;
1702 }
1703 Ok(true)
1704 }
1705
1706 /// Returns true if evaluation could introduce an error on non-error inputs.
1707 pub fn could_error(&self) -> bool {
1708 self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1709 || self.mfp.expressions.iter().any(|e| e.could_error())
1710 }
1711 }
1712
1713 impl<E: OptimizableExpr> std::ops::Deref for SafeMfpPlan<E> {
1714 type Target = MapFilterProject<E>;
1715 fn deref(&self) -> &Self::Target {
1716 &self.mfp
1717 }
1718 }
1719
1720 /// Predicates partitioned into temporal and non-temporal.
1721 ///
1722 /// Temporal predicates require some recognition to determine their
1723 /// structure, and it is best to do that once and re-use the results.
1724 ///
1725 /// There are restrictions on the temporal predicates we currently support.
1726 /// They must directly constrain `MzNow` from below or above,
1727 /// by expressions that do not themselves contain `MzNow`.
1728 /// Conjunctions of such constraints are also ok.
1729 #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
1730 #[serde(bound(deserialize = "E: serde::de::DeserializeOwned"))]
1731 pub struct MfpPlan<E: OptimizableExpr = MirScalarExpr> {
1732 /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1733 pub(crate) mfp: SafeMfpPlan<E>,
1734 /// Expressions that when evaluated lower-bound or equal (<=) `MzNow`.
1735 pub(crate) lower_bounds: Vec<E>,
1736 /// Expressions that when evaluated strictly upper-bound `MzNow`.
1737 pub(crate) upper_bounds: Vec<E>,
1738 }
1739
1740 impl<E: OptimizableExpr> MfpPlan<E> {
1741 /// Construct an `MfpPlan` from its components.
1742 pub fn from_parts(mfp: SafeMfpPlan<E>, lower_bounds: Vec<E>, upper_bounds: Vec<E>) -> Self {
1743 Self {
1744 mfp,
1745 lower_bounds,
1746 upper_bounds,
1747 }
1748 }
1749
1750 /// Deconstruct into components.
1751 pub fn into_parts(self) -> (SafeMfpPlan<E>, Vec<E>, Vec<E>) {
1752 (self.mfp, self.lower_bounds, self.upper_bounds)
1753 }
1754
1755 /// Access the inner `SafeMfpPlan`.
1756 pub fn safe_mfp(&self) -> &SafeMfpPlan<E> {
1757 &self.mfp
1758 }
1759
1760 /// Borrow all parts: `(safe_mfp, lower_bounds, upper_bounds)`.
1761 pub fn as_parts(&self) -> (&SafeMfpPlan<E>, &[E], &[E]) {
1762 (&self.mfp, &self.lower_bounds, &self.upper_bounds)
1763 }
1764
1765 /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1766 ///
1767 /// The first returned list is of predicates that do not contain `mz_now`.
1768 /// The second and third returned lists contain expressions that, once evaluated, lower
1769 /// and upper bound the validity interval of a record, respectively. These second two
1770 /// lists are populated only by binary expressions of the form
1771 /// ```ignore
1772 /// mz_now cmp_op expr
1773 /// ```
1774 /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1775 ///
1776 /// If any unsupported expression is found, for example one that uses `mz_now`
1777 /// in an unsupported position, an error is returned.
1778 pub fn create_from(mut mfp: MapFilterProject<E>) -> Result<Self, String> {
1779 mfp.optimize();
1780
1781 let mut temporal = Vec::new();
1782 mfp.predicates.retain(|(_position, predicate)| {
1783 if OptimizableExpr::contains_temporal(predicate) {
1784 temporal.push(predicate.clone());
1785 false
1786 } else {
1787 true
1788 }
1789 });
1790
1791 let (lower_bounds, upper_bounds) = E::extract_temporal_bounds(temporal)?;
1792
1793 Ok(Self {
1794 mfp: SafeMfpPlan { mfp },
1795 lower_bounds,
1796 upper_bounds,
1797 })
1798 }
1799
1800 /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1801 pub fn is_identity(&self) -> bool {
1802 self.mfp.mfp.is_identity()
1803 && self.lower_bounds.is_empty()
1804 && self.upper_bounds.is_empty()
1805 }
1806
1807 /// Returns `true` if the plan contains temporal bounds
1808 /// (i.e., predicates involving `mz_now()`).
1809 pub fn has_temporal_bounds(&self) -> bool {
1810 !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
1811 }
1812
1813 /// Returns `self`, and leaves behind an identity operator that acts on its output.
1814 pub fn take(&mut self) -> Self {
1815 let mut identity = Self {
1816 mfp: SafeMfpPlan {
1817 mfp: MapFilterProject::new(self.mfp.projection.len()),
1818 },
1819 lower_bounds: Default::default(),
1820 upper_bounds: Default::default(),
1821 };
1822 std::mem::swap(self, &mut identity);
1823 identity
1824 }
1825
1826 /// Attempt to convert self into a non-temporal MapFilterProject plan.
1827 ///
1828 /// If that is not possible, the original instance is returned as an error.
1829 #[allow(clippy::result_large_err)]
1830 pub fn into_nontemporal(self) -> Result<SafeMfpPlan<E>, Self> {
1831 if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1832 Ok(self.mfp)
1833 } else {
1834 Err(self)
1835 }
1836 }
1837
1838 /// Returns an iterator over mutable references to all non-temporal
1839 /// scalar expressions in the plan.
1840 ///
1841 /// The order of iteration is unspecified.
1842 pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut E> {
1843 iter::empty()
1844 .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1845 .chain(&mut self.mfp.mfp.expressions)
1846 .chain(&mut self.lower_bounds)
1847 .chain(&mut self.upper_bounds)
1848 }
1849
1850 /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1851 ///
1852 /// At the moment, this is only true if it projects away all columns and applies no filters,
1853 /// but it could be extended to plans that produce literals independent of the input.
1854 pub fn ignores_input(&self) -> bool {
1855 self.lower_bounds.is_empty()
1856 && self.upper_bounds.is_empty()
1857 && self.mfp.mfp.projection.is_empty()
1858 && self.mfp.mfp.predicates.is_empty()
1859 }
1860 }
1861
1862 impl<E: OptimizableExpr + Eval> MfpPlan<E> {
1863 /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1864 ///
1865 /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1866 /// or an evaluation error. If `self contains temporal predicates, the results can be times
1867 /// that are greater than the input `time`, and may contain negated `diff` values.
1868 ///
1869 /// The `row_builder` is not cleared first, but emptied if the function
1870 /// returns an iterator with any `Ok(_)` element.
1871 pub fn evaluate<'b, 'a: 'b, Err: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1872 &'a self,
1873 datums: &'b mut Vec<Datum<'a>>,
1874 arena: &'a RowArena,
1875 time: mz_repr::Timestamp,
1876 diff: Diff,
1877 valid_time: V,
1878 row_builder: &mut Row,
1879 ) -> impl Iterator<
1880 Item = Result<(Row, mz_repr::Timestamp, Diff), (Err, mz_repr::Timestamp, Diff)>,
1881 > + use<Err, V, E> {
1882 match self.mfp.evaluate_inner(datums, arena) {
1883 Err(e) => {
1884 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1885 }
1886 Ok(true) => {}
1887 Ok(false) => {
1888 return None.into_iter().chain(None);
1889 }
1890 }
1891
1892 // Lower and upper bounds.
1893 let mut lower_bound = time;
1894 let mut upper_bound = None;
1895
1896 // Track whether we have seen a null in either bound, as this should
1897 // prevent the record from being produced at any time.
1898 let mut null_eval = false;
1899
1900 // Advance our lower bound to be at least the result of any lower bound
1901 // expressions.
1902 for l in self.lower_bounds.iter() {
1903 match l.eval(datums, arena) {
1904 Err(e) => {
1905 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1906 }
1907 Ok(Datum::MzTimestamp(d)) => {
1908 lower_bound = lower_bound.max(d);
1909 }
1910 Ok(Datum::Null) => {
1911 null_eval = true;
1912 }
1913 x => {
1914 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1915 }
1916 }
1917 }
1918
1919 // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1920 if !valid_time(&lower_bound) {
1921 return None.into_iter().chain(None);
1922 }
1923
1924 // If there are any upper bounds, determine the minimum upper bound.
1925 for u in self.upper_bounds.iter() {
1926 // We can cease as soon as the lower and upper bounds match,
1927 // as the update will certainly not be produced in that case.
1928 if upper_bound != Some(lower_bound) {
1929 match u.eval(datums, arena) {
1930 Err(e) => {
1931 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1932 }
1933 Ok(Datum::MzTimestamp(d)) => {
1934 if let Some(upper) = upper_bound {
1935 upper_bound = Some(upper.min(d));
1936 } else {
1937 upper_bound = Some(d);
1938 };
1939 // Force the upper bound to be at least the lower
1940 // bound. The `is_some()` test should always be true
1941 // due to the above block, but maintain it here in
1942 // case that changes. It's hopefully optimized away.
1943 if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1944 upper_bound = Some(lower_bound);
1945 }
1946 }
1947 Ok(Datum::Null) => {
1948 null_eval = true;
1949 }
1950 x => {
1951 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1952 }
1953 }
1954 }
1955 }
1956
1957 // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1958 if let Some(upper) = &mut upper_bound {
1959 if !valid_time(upper) {
1960 upper_bound = None;
1961 }
1962 }
1963
1964 // Produce an output only if the upper bound exceeds the lower bound,
1965 // and if we did not encounter a `null` in our evaluation.
1966 if Some(lower_bound) != upper_bound && !null_eval {
1967 row_builder
1968 .packer()
1969 .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1970 let upper_opt =
1971 upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1972 let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1973 lower.into_iter().chain(upper_opt)
1974 } else {
1975 None.into_iter().chain(None)
1976 }
1977 }
1978
1979 /// Returns true if evaluation could introduce an error on non-error inputs.
1980 pub fn could_error(&self) -> bool {
1981 self.mfp.could_error()
1982 || self.lower_bounds.iter().any(|e| e.could_error())
1983 || self.upper_bounds.iter().any(|e| e.could_error())
1984 }
1985 }
1986}