mz_expr/linear.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_repr::{Datum, Row};
13use serde::{Deserialize, Serialize};
14
15use crate::visit::Visit;
16use crate::{MirRelationExpr, MirScalarExpr};
17
18/// A compound operator that can be applied row-by-row.
19///
20/// This operator integrates the map, filter, and project operators.
21/// It applies a sequences of map expressions, which are allowed to
22/// refer to previous expressions, interleaved with predicates which
23/// must be satisfied for an output to be produced. If all predicates
24/// evaluate to `Datum::True` the data at the identified columns are
25/// collected and produced as output in a packed `Row`.
26///
27/// This operator is a "builder" and its contents may contain expressions
28/// that are not yet executable. For example, it may contain temporal
29/// expressions in `self.expressions`, even though this is not something
30/// we can directly evaluate. The plan creation methods will defensively
31/// ensure that the right thing happens.
32#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd)]
33pub struct MapFilterProject {
34 /// A sequence of expressions that should be appended to the row.
35 ///
36 /// Many of these expressions may not be produced in the output,
37 /// and may only be present as common subexpressions.
38 pub expressions: Vec<MirScalarExpr>,
39 /// Expressions that must evaluate to `Datum::True` for the output
40 /// row to be produced.
41 ///
42 /// Each entry is prepended with a column identifier indicating
43 /// the column *before* which the predicate should first be applied.
44 /// Most commonly this would be one plus the largest column identifier
45 /// in the predicate's support, but it could be larger to implement
46 /// guarded evaluation of predicates.
47 ///
48 /// This list should be sorted by the first field.
49 pub predicates: Vec<(usize, MirScalarExpr)>,
50 /// A sequence of column identifiers whose data form the output row.
51 pub projection: Vec<usize>,
52 /// The expected number of input columns.
53 ///
54 /// This is needed to ensure correct identification of newly formed
55 /// columns in the output.
56 pub input_arity: usize,
57}
58
59impl Display for MapFilterProject {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 writeln!(f, "MapFilterProject(")?;
62 writeln!(f, " expressions:")?;
63 self.expressions
64 .iter()
65 .enumerate()
66 .try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
67 writeln!(f, " predicates:")?;
68 self.predicates
69 .iter()
70 .try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
71 writeln!(f, " projection: {:?}", self.projection)?;
72 writeln!(f, " input_arity: {}", self.input_arity)?;
73 writeln!(f, ")")
74 }
75}
76
77impl MapFilterProject {
78 /// Create a no-op operator for an input of a supplied arity.
79 pub fn new(input_arity: usize) -> Self {
80 Self {
81 expressions: Vec::new(),
82 predicates: Vec::new(),
83 projection: (0..input_arity).collect(),
84 input_arity,
85 }
86 }
87
88 /// Given two mfps, return an mfp that applies one
89 /// followed by the other.
90 /// Note that the arguments are in the opposite order
91 /// from how function composition is usually written in mathematics.
92 pub fn compose(before: Self, after: Self) -> Self {
93 let (m, f, p) = after.into_map_filter_project();
94 before.map(m).filter(f).project(p)
95 }
96
97 /// True if the operator describes the identity transformation.
98 pub fn is_identity(&self) -> bool {
99 self.expressions.is_empty()
100 && self.predicates.is_empty()
101 && self.projection.len() == self.input_arity
102 && self.projection.iter().enumerate().all(|(i, p)| i == *p)
103 }
104
105 /// Retain only the indicated columns in the presented order.
106 pub fn project<I>(mut self, columns: I) -> Self
107 where
108 I: IntoIterator<Item = usize> + std::fmt::Debug,
109 {
110 self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
111 self
112 }
113
114 /// Retain only rows satisfying these predicates.
115 ///
116 /// This method introduces predicates as eagerly as they can be evaluated,
117 /// which may not be desired for predicates that may cause exceptions.
118 /// If fine manipulation is required, the predicates can be added manually.
119 pub fn filter<I>(mut self, predicates: I) -> Self
120 where
121 I: IntoIterator<Item = MirScalarExpr>,
122 {
123 for mut predicate in predicates {
124 // Correct column references.
125 predicate.permute(&self.projection[..]);
126
127 // Validate column references.
128 assert!(
129 predicate
130 .support()
131 .into_iter()
132 .all(|c| c < self.input_arity + self.expressions.len())
133 );
134
135 // Insert predicate as eagerly as it can be evaluated:
136 // just after the largest column in its support is formed.
137 let max_support = predicate
138 .support()
139 .into_iter()
140 .max()
141 .map(|c| c + 1)
142 .unwrap_or(0);
143 self.predicates.push((max_support, predicate))
144 }
145 // Stable sort predicates by position at which they take effect.
146 // We put literal errors at the end as a stop-gap to avoid erroring
147 // before we are able to evaluate any predicates that might prevent it.
148 self.predicates
149 .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
150 self
151 }
152
153 /// Append the result of evaluating expressions to each row.
154 pub fn map<I>(mut self, expressions: I) -> Self
155 where
156 I: IntoIterator<Item = MirScalarExpr>,
157 {
158 for mut expression in expressions {
159 // Correct column references.
160 expression.permute(&self.projection[..]);
161
162 // Validate column references.
163 assert!(
164 expression
165 .support()
166 .into_iter()
167 .all(|c| c < self.input_arity + self.expressions.len())
168 );
169
170 // Introduce expression and produce as output.
171 self.expressions.push(expression);
172 self.projection
173 .push(self.input_arity + self.expressions.len() - 1);
174 }
175
176 self
177 }
178
179 /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
180 pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
181 let predicates = self
182 .predicates
183 .into_iter()
184 .map(|(_pos, predicate)| predicate)
185 .collect();
186 (self.expressions, predicates, self.projection)
187 }
188
189 /// As the arguments to `Map`, `Filter`, and `Project` operators.
190 ///
191 /// In principle, this operator can be implemented as a sequence of
192 /// more elemental operators, likely less efficiently.
193 pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
194 self.clone().into_map_filter_project()
195 }
196
197 /// Determines if a scalar expression must be equal to a literal datum.
198 pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum<'_>> {
199 for (_pos, predicate) in self.predicates.iter() {
200 if let MirScalarExpr::CallBinary {
201 func: crate::BinaryFunc::Eq(_),
202 expr1,
203 expr2,
204 } = predicate
205 {
206 if let Some(Ok(datum1)) = expr1.as_literal() {
207 if &**expr2 == expr {
208 return Some(datum1);
209 }
210 }
211 if let Some(Ok(datum2)) = expr2.as_literal() {
212 if &**expr1 == expr {
213 return Some(datum2);
214 }
215 }
216 }
217 }
218 None
219 }
220
221 /// Determines if a sequence of scalar expressions must be equal to a literal row.
222 ///
223 /// This method returns `None` on an empty `exprs`, which might be surprising, but
224 /// seems to line up with its callers' expectations of that being a non-constraint.
225 /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
226 /// if they would rather have a literal empty row.
227 pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
228 if exprs.is_empty() {
229 return None;
230 }
231 let mut row = Row::default();
232 let mut packer = row.packer();
233 for expr in exprs {
234 if let Some(literal) = self.literal_constraint(expr) {
235 packer.push(literal);
236 } else {
237 return None;
238 }
239 }
240 Some(row)
241 }
242
243 /// Extracts any MapFilterProject at the root of the expression.
244 ///
245 /// The expression will be modified to extract any maps, filters, and
246 /// projections, which will be returned as `Self`. If there are no maps,
247 /// filters, or projections the method will return an identity operator.
248 ///
249 /// The extracted expressions may contain temporal predicates, and one
250 /// should be careful to apply them blindly.
251 pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
252 // TODO: This could become iterative rather than recursive if
253 // we were able to fuse MFP operators from below, rather than
254 // from above.
255 match expr {
256 MirRelationExpr::Map { input, scalars } => {
257 let (mfp, expr) = Self::extract_from_expression(input);
258 (mfp.map(scalars.iter().cloned()), expr)
259 }
260 MirRelationExpr::Filter { input, predicates } => {
261 let (mfp, expr) = Self::extract_from_expression(input);
262 (mfp.filter(predicates.iter().cloned()), expr)
263 }
264 MirRelationExpr::Project { input, outputs } => {
265 let (mfp, expr) = Self::extract_from_expression(input);
266 (mfp.project(outputs.iter().cloned()), expr)
267 }
268 // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
269 // this call to `arity()`.
270 x => (Self::new(x.arity()), x),
271 }
272 }
273
274 /// Extracts an error-free MapFilterProject at the root of the expression.
275 ///
276 /// The expression will be modified to extract maps, filters, and projects
277 /// from the root of the expression, which will be returned as `Self`. The
278 /// extraction will halt if a Map or Filter containing a literal error is
279 /// reached. Otherwise, the method will return an identity operator.
280 ///
281 /// This method is meant to be used during optimization, where it is
282 /// necessary to avoid moving around maps and filters with errors.
283 pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
284 match expr {
285 MirRelationExpr::Map { input, scalars }
286 if scalars.iter().all(|s| !s.is_literal_err()) =>
287 {
288 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
289 (mfp.map(scalars.iter().cloned()), expr)
290 }
291 MirRelationExpr::Filter { input, predicates }
292 if predicates.iter().all(|p| !p.is_literal_err()) =>
293 {
294 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
295 (mfp.filter(predicates.iter().cloned()), expr)
296 }
297 MirRelationExpr::Project { input, outputs } => {
298 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
299 (mfp.project(outputs.iter().cloned()), expr)
300 }
301 x => (Self::new(x.arity()), x),
302 }
303 }
304
305 /// Extracts an error-free MapFilterProject at the root of the expression.
306 ///
307 /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
308 /// mutable reference.
309 pub fn extract_non_errors_from_expr_ref_mut(
310 expr: &mut MirRelationExpr,
311 ) -> (Self, &mut MirRelationExpr) {
312 // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
313 // superfluous outer if, which works around a borrow-checker issue:
314 // https://github.com/rust-lang/rust/issues/54663
315 if matches!(expr, MirRelationExpr::Map { input: _, scalars } if scalars.iter().all(|s| !s.is_literal_err()))
316 || matches!(expr, MirRelationExpr::Filter { input: _, predicates } if predicates.iter().all(|p| !p.is_literal_err()))
317 || matches!(expr, MirRelationExpr::Project { .. })
318 {
319 match expr {
320 MirRelationExpr::Map { input, scalars }
321 if scalars.iter().all(|s| !s.is_literal_err()) =>
322 {
323 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
324 (mfp.map(scalars.iter().cloned()), expr)
325 }
326 MirRelationExpr::Filter { input, predicates }
327 if predicates.iter().all(|p| !p.is_literal_err()) =>
328 {
329 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
330 (mfp.filter(predicates.iter().cloned()), expr)
331 }
332 MirRelationExpr::Project { input, outputs } => {
333 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
334 (mfp.project(outputs.iter().cloned()), expr)
335 }
336 _ => unreachable!(),
337 }
338 } else {
339 (Self::new(expr.arity()), expr)
340 }
341 }
342
343 /// Removes an error-free MapFilterProject from the root of the expression.
344 ///
345 /// The expression will be modified to extract maps, filters, and projects
346 /// from the root of the expression, which will be returned as `Self`. The
347 /// extraction will halt if a Map or Filter containing a literal error is
348 /// reached. Otherwise, the method will return an
349 /// identity operator, and the expression will remain unchanged.
350 ///
351 /// This method is meant to be used during optimization, where it is
352 /// necessary to avoid moving around maps and filters with errors.
353 pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
354 match expr {
355 MirRelationExpr::Map { input, scalars }
356 if scalars.iter().all(|s| !s.is_literal_err()) =>
357 {
358 let mfp =
359 Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
360 *expr = input.take_dangerous();
361 mfp
362 }
363 MirRelationExpr::Filter { input, predicates }
364 if predicates.iter().all(|p| !p.is_literal_err()) =>
365 {
366 let mfp = Self::extract_non_errors_from_expr_mut(input)
367 .filter(predicates.iter().cloned());
368 *expr = input.take_dangerous();
369 mfp
370 }
371 MirRelationExpr::Project { input, outputs } => {
372 let mfp =
373 Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
374 *expr = input.take_dangerous();
375 mfp
376 }
377 x => Self::new(x.arity()),
378 }
379 }
380
381 /// Extracts temporal predicates into their own `Self`.
382 ///
383 /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
384 /// though there could be justification for extracting them as well if they are otherwise
385 /// unused.
386 ///
387 /// This separation is valuable when the execution cannot be fused into one operator.
388 pub fn extract_temporal(&mut self) -> Self {
389 // Optimize the expression, as it is only post-optimization that we can be certain
390 // that temporal expressions are restricted to filters. We could relax this in the
391 // future to be only `inline_expressions` and `remove_undemanded`, but optimization
392 // seems to be the best fit at the moment.
393 self.optimize();
394
395 // Assert that we no longer have temporal expressions to evaluate. This should only
396 // occur if the optimization above results with temporal expressions yielded in the
397 // output, which is out of spec for how the type is meant to be used.
398 assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
399
400 // Extract temporal predicates from `self.predicates`.
401 let mut temporal_predicates = Vec::new();
402 self.predicates.retain(|(_position, predicate)| {
403 if predicate.contains_temporal() {
404 temporal_predicates.push(predicate.clone());
405 false
406 } else {
407 true
408 }
409 });
410
411 // Determine extended input columns used by temporal filters.
412 let mut support = BTreeSet::new();
413 for predicate in temporal_predicates.iter() {
414 support.extend(predicate.support());
415 }
416
417 // Discover the locations of these columns after `self.projection`.
418 let old_projection_len = self.projection.len();
419 let mut new_location = BTreeMap::new();
420 for original in support.iter() {
421 if let Some(position) = self.projection.iter().position(|x| x == original) {
422 new_location.insert(*original, position);
423 } else {
424 new_location.insert(*original, self.projection.len());
425 self.projection.push(*original);
426 }
427 }
428 // Permute references in extracted predicates to their new locations.
429 for predicate in temporal_predicates.iter_mut() {
430 predicate.permute_map(&new_location);
431 }
432
433 // Form a new `Self` containing the temporal predicates to return.
434 Self::new(self.projection.len())
435 .filter(temporal_predicates)
436 .project(0..old_projection_len)
437 }
438
439 /// Extracts common expressions from multiple `Self` into a result `Self`.
440 ///
441 /// The argument `mfps` are mutated so that each are functionaly equivalent to their
442 /// corresponding input, when composed atop the resulting `Self`.
443 ///
444 /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
445 pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
446 match mfps.len() {
447 0 => {
448 panic!("Cannot call method on empty arguments");
449 }
450 1 => {
451 let output_arity = mfps[0].projection.len();
452 std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
453 }
454 _ => {
455 // More generally, we convert each mfp to ANF, at which point we can
456 // repeatedly extract atomic expressions that depend only on input
457 // columns, migrate them to an input mfp, and repeat until no such
458 // expressions exist. At this point, we can also migrate predicates
459 // and then determine and push down projections.
460
461 // Prepare a return `Self`.
462 let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
463
464 // We convert each mfp to ANF, using `memoize_expressions`.
465 for mfp in mfps.iter_mut() {
466 mfp.memoize_expressions();
467 }
468
469 // We repeatedly extract common expressions, until none remain.
470 let mut done = false;
471 while !done {
472 // We use references to determine common expressions, and must
473 // introduce a scope here to drop the borrows before mutation.
474 let common = {
475 // The input arity may increase as we iterate, so recapture.
476 let input_arity = result_mfp.projection.len();
477 let mut prev: BTreeSet<_> = mfps[0]
478 .expressions
479 .iter()
480 .filter(|e| e.support().iter().max() < Some(&input_arity))
481 .collect();
482 let mut next = BTreeSet::default();
483 for mfp in mfps[1..].iter() {
484 for expr in mfp.expressions.iter() {
485 if prev.contains(expr) {
486 next.insert(expr);
487 }
488 }
489 std::mem::swap(&mut prev, &mut next);
490 next.clear();
491 }
492 prev.into_iter().cloned().collect::<Vec<_>>()
493 };
494 // Without new common expressions, we should terminate the loop.
495 done = common.is_empty();
496
497 // Migrate each expression in `common` to `result_mfp`.
498 for expr in common.into_iter() {
499 // Update each mfp by removing expr and updating column references.
500 for mfp in mfps.iter_mut() {
501 // With `expr` next in `result_mfp`, it is as if we are rotating it to
502 // be the first expression in `mfp`, and then removing it from `mfp` and
503 // increasing the input arity of `mfp`.
504 let arity = result_mfp.projection.len();
505 let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
506 let index = arity + found;
507 // Column references change due to the rotation from `index` to `arity`.
508 let action = |c: &mut usize| {
509 if arity <= *c && *c < index {
510 *c += 1;
511 } else if *c == index {
512 *c = arity;
513 }
514 };
515 // Rotate `expr` from `found` to first, and then snip.
516 // Short circuit by simply removing and incrementing the input arity.
517 mfp.input_arity += 1;
518 mfp.expressions.remove(found);
519 // Update column references in expressions, predicates, and projections.
520 for e in mfp.expressions.iter_mut() {
521 e.visit_columns(action);
522 }
523 for (o, e) in mfp.predicates.iter_mut() {
524 e.visit_columns(action);
525 // Max out the offset for the predicate; optimization will correct.
526 *o = mfp.input_arity + mfp.expressions.len();
527 }
528 for c in mfp.projection.iter_mut() {
529 action(c);
530 }
531 }
532 // Install the expression and update
533 result_mfp.expressions.push(expr);
534 result_mfp.projection.push(result_mfp.projection.len());
535 }
536 }
537 // As before, but easier: predicates in common to all mfps.
538 let common_preds: Vec<MirScalarExpr> = {
539 let input_arity = result_mfp.projection.len();
540 let mut prev: BTreeSet<_> = mfps[0]
541 .predicates
542 .iter()
543 .map(|(_, e)| e)
544 .filter(|e| e.support().iter().max() < Some(&input_arity))
545 .collect();
546 let mut next = BTreeSet::default();
547 for mfp in mfps[1..].iter() {
548 for (_, expr) in mfp.predicates.iter() {
549 if prev.contains(expr) {
550 next.insert(expr);
551 }
552 }
553 std::mem::swap(&mut prev, &mut next);
554 next.clear();
555 }
556 // Expressions in common, that we will append to `result_mfp.expressions`.
557 prev.into_iter().cloned().collect::<Vec<_>>()
558 };
559 for mfp in mfps.iter_mut() {
560 mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
561 mfp.optimize();
562 }
563 result_mfp.predicates.extend(
564 common_preds
565 .into_iter()
566 .map(|e| (result_mfp.projection.len(), e)),
567 );
568
569 // Then, look for unused columns and project them away.
570 let mut common_demand = BTreeSet::new();
571 for mfp in mfps.iter() {
572 common_demand.extend(mfp.demand());
573 }
574 // columns in `common_demand` must be retained, but others
575 // may be discarded.
576 let common_demand = (0..result_mfp.projection.len())
577 .filter(|x| common_demand.contains(x))
578 .collect::<Vec<_>>();
579 let remap = common_demand
580 .iter()
581 .cloned()
582 .enumerate()
583 .map(|(new, old)| (old, new))
584 .collect::<BTreeMap<_, _>>();
585 for mfp in mfps.iter_mut() {
586 mfp.permute_fn(|c| remap[&c], common_demand.len());
587 }
588 result_mfp = result_mfp.project(common_demand);
589
590 // Return the resulting MFP.
591 result_mfp.optimize();
592 result_mfp
593 }
594 }
595 }
596
597 /// Returns `self`, and leaves behind an identity operator that acts on its output.
598 pub fn take(&mut self) -> Self {
599 let mut identity = Self::new(self.projection.len());
600 std::mem::swap(self, &mut identity);
601 identity
602 }
603
604 /// Convert the `MapFilterProject` into a staged evaluation plan.
605 ///
606 /// The main behavior is extract temporal predicates, which cannot be evaluated
607 /// using the standard machinery.
608 pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
609 plan::MfpPlan::create_from(self)
610 }
611}
612
613impl MapFilterProject {
614 /// Partitions `self` into two instances, one of which can be eagerly applied.
615 ///
616 /// The `available` argument indicates which input columns are available (keys)
617 /// and in which positions (values). This information may allow some maps and
618 /// filters to execute. The `input_arity` argument reports the total number of
619 /// input columns (which may include some not present in `available`)
620 ///
621 /// This method partitions `self` in two parts, `(before, after)`, where `before`
622 /// can be applied on columns present as keys in `available`, and `after` must
623 /// await the introduction of the other input columns.
624 ///
625 /// The `before` instance will *append* any columns that can be determined from
626 /// `available` but will project away any of these columns that are not needed by
627 /// `after`. Importantly, this means that `before` will leave intact *all* input
628 /// columns including those not referenced in `available`.
629 ///
630 /// The `after` instance will presume all input columns are available, followed
631 /// by the appended columns of the `before` instance. It may be that some input
632 /// columns can be projected away in `before` if `after` does not need them, but
633 /// we leave that as something the caller can apply if needed (it is otherwise
634 /// complicated to negotiate which input columns `before` should retain).
635 ///
636 /// To correctly reconstruct `self` from `before` and `after`, one must introduce
637 /// additional input columns, permute all input columns to their locations as
638 /// expected by `self`, follow this by new columns appended by `before`, and
639 /// remove all other columns that may be present.
640 ///
641 /// # Example
642 ///
643 /// ```rust
644 /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr, func};
645 ///
646 /// // imagine an action on columns (a, b, c, d).
647 /// let original = MapFilterProject::new(4).map(vec![
648 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::AddInt64),
649 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), func::AddInt64),
650 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), func::AddInt64),
651 /// ]).project(vec![6]);
652 ///
653 /// // Imagine we start with columns (b, x, a, y, c).
654 /// //
655 /// // The `partition` method requires a map from *expected* input columns to *actual*
656 /// // input columns. In the example above, the columns a, b, and c exist, and are at
657 /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
658 /// let mut available_columns = std::collections::BTreeMap::new();
659 /// available_columns.insert(0, 2);
660 /// available_columns.insert(1, 0);
661 /// available_columns.insert(2, 4);
662 /// // Partition `original` using the available columns and current input arity.
663 /// // This informs `partition` which columns are available, where they can be found,
664 /// // and how many columns are not relevant but should be preserved.
665 /// let (before, after) = original.partition(available_columns, 5);
666 ///
667 /// // `before` sees all five input columns, and should append `a + b + c`.
668 /// assert_eq!(before, MapFilterProject::new(5).map(vec![
669 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), func::AddInt64),
670 /// MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), func::AddInt64),
671 /// ]).project(vec![0, 1, 2, 3, 4, 6]));
672 ///
673 /// // `after` expects to see `(a, b, c, d, a + b + c)`.
674 /// assert_eq!(after, MapFilterProject::new(5).map(vec![
675 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), func::AddInt64)
676 /// ]).project(vec![5]));
677 ///
678 /// // To reconstruct `self`, we must introduce the columns that are not present,
679 /// // and present them in the order intended by `self`. In this example, we must
680 /// // introduce column d and permute the columns so that they begin (a, b, c, d).
681 /// // The columns x and y must be projected away, and any columns introduced by
682 /// // `begin` must be retained in their current order.
683 ///
684 /// // The `after` instance expects to be provided with all inputs, but it
685 /// // may not need all inputs. The `demand()` and `permute()` methods can
686 /// // optimize the representation.
687 /// ```
688 pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
689 // Map expressions, filter predicates, and projections for `before` and `after`.
690 let mut before_expr = Vec::new();
691 let mut before_pred = Vec::new();
692 let mut before_proj = Vec::new();
693 let mut after_expr = Vec::new();
694 let mut after_pred = Vec::new();
695 let mut after_proj = Vec::new();
696
697 // Track which output columns must be preserved in the output of `before`.
698 let mut demanded = BTreeSet::new();
699 demanded.extend(0..self.input_arity);
700 demanded.extend(self.projection.iter());
701
702 // Determine which map expressions can be computed from the available subset.
703 // Some expressions may depend on other expressions, but by evaluating them
704 // in forward order we should accurately determine the available expressions.
705 let mut available_expr = vec![false; self.input_arity];
706 // Initialize available columns from `available`, which is then not used again.
707 for index in available.keys() {
708 available_expr[*index] = true;
709 }
710 for expr in self.expressions.into_iter() {
711 // We treat an expression as available if its supporting columns are available,
712 // and if it is not a literal (we want to avoid pushing down literals). This
713 // choice is ad-hoc, but the intent is that we partition the operators so
714 // that we can reduce the row representation size and total computation.
715 // Pushing down literals harms the former and does nothing for the latter.
716 // In the future, we'll want to have a harder think about this trade-off, as
717 // we are certainly making sub-optimal decisions by pushing down all available
718 // work.
719 // TODO(mcsherry): establish better principles about what work to push down.
720 let is_available =
721 expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
722 if is_available {
723 before_expr.push(expr);
724 } else {
725 demanded.extend(expr.support());
726 after_expr.push(expr);
727 }
728 available_expr.push(is_available);
729 }
730
731 // Determine which predicates can be computed from the available subset.
732 for (_when, pred) in self.predicates.into_iter() {
733 let is_available = pred.support().into_iter().all(|i| available_expr[i]);
734 if is_available {
735 before_pred.push(pred);
736 } else {
737 demanded.extend(pred.support());
738 after_pred.push(pred);
739 }
740 }
741
742 // Map from prior output location to location in un-projected `before`.
743 // This map is used to correct references in `before` but it should be
744 // adjusted to reflect `before`s projection prior to use in `after`.
745 let mut before_map = available;
746 // Input columns include any additional undescribed columns that may
747 // not be captured by the `available` argument, so we must independently
748 // track the current number of columns (vs relying on `before_map.len()`).
749 let mut input_columns = input_arity;
750 for index in self.input_arity..available_expr.len() {
751 if available_expr[index] {
752 before_map.insert(index, input_columns);
753 input_columns += 1;
754 }
755 }
756
757 // Permute the column references in `before` expressions and predicates.
758 for expr in before_expr.iter_mut() {
759 expr.permute_map(&before_map);
760 }
761 for pred in before_pred.iter_mut() {
762 pred.permute_map(&before_map);
763 }
764
765 // Demand information determines `before`s output projection.
766 // Specifically, we produce all input columns in the output, as well as
767 // any columns that are available and demanded.
768 before_proj.extend(0..input_arity);
769 for index in self.input_arity..available_expr.len() {
770 // If an intermediate result is both available and demanded,
771 // we should produce it as output.
772 if available_expr[index] && demanded.contains(&index) {
773 // Use the new location of `index`.
774 before_proj.push(before_map[&index]);
775 }
776 }
777
778 // Map from prior output locations to location in post-`before` columns.
779 // This map is used to correct references in `after`.
780 // The presumption is that `after` will be presented with all input columns,
781 // followed by the output columns introduced by `before` in order.
782 let mut after_map = BTreeMap::new();
783 for index in 0..self.input_arity {
784 after_map.insert(index, index);
785 }
786 for index in self.input_arity..available_expr.len() {
787 // If an intermediate result is both available and demanded,
788 // it was produced as output.
789 if available_expr[index] && demanded.contains(&index) {
790 // We expect to find the output as far after `self.input_arity` as
791 // it was produced after `input_arity` in the output of `before`.
792 let location = self.input_arity
793 + (before_proj
794 .iter()
795 .position(|x| x == &before_map[&index])
796 .unwrap()
797 - input_arity);
798 after_map.insert(index, location);
799 }
800 }
801 // We must now re-map the remaining non-demanded expressions, which are
802 // contiguous rather than potentially interspersed.
803 for index in self.input_arity..available_expr.len() {
804 if !available_expr[index] {
805 after_map.insert(index, after_map.len());
806 }
807 }
808
809 // Permute the column references in `after` expressions and predicates.
810 for expr in after_expr.iter_mut() {
811 expr.permute_map(&after_map);
812 }
813 for pred in after_pred.iter_mut() {
814 pred.permute_map(&after_map);
815 }
816 // Populate `after` projection with the new locations of `self.projection`.
817 for index in self.projection {
818 after_proj.push(after_map[&index]);
819 }
820
821 // Form and return the before and after MapFilterProject instances.
822 let before = Self::new(input_arity)
823 .map(before_expr)
824 .filter(before_pred)
825 .project(before_proj.clone());
826 let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
827 .map(after_expr)
828 .filter(after_pred)
829 .project(after_proj);
830 (before, after)
831 }
832
833 /// Lists input columns whose values are used in outputs.
834 ///
835 /// It is entirely appropriate to determine the demand of an instance
836 /// and then both apply a projection to the subject of the instance and
837 /// `self.permute` this instance.
838 pub fn demand(&self) -> BTreeSet<usize> {
839 let mut demanded = BTreeSet::new();
840 for (_index, pred) in self.predicates.iter() {
841 demanded.extend(pred.support());
842 }
843 demanded.extend(self.projection.iter().cloned());
844 for index in (0..self.expressions.len()).rev() {
845 if demanded.contains(&(self.input_arity + index)) {
846 demanded.extend(self.expressions[index].support());
847 }
848 }
849 demanded.retain(|col| col < &self.input_arity);
850 demanded
851 }
852
853 /// Update input column references, due to an input projection or permutation.
854 ///
855 /// The `shuffle` argument remaps expected column identifiers to new locations,
856 /// with the expectation that `shuffle` describes all input columns, and so the
857 /// intermediate results will be able to start at position `shuffle.len()`.
858 ///
859 /// The supplied `shuffle` might not list columns that are not "demanded" by the
860 /// instance, and so we should ensure that `self` is optimized to not reference
861 /// columns that are not demanded.
862 pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
863 where
864 F: Fn(usize) -> usize,
865 {
866 let (mut map, mut filter, mut project) = self.as_map_filter_project();
867 let map_len = map.len();
868 let action = |col: &mut usize| {
869 if self.input_arity <= *col && *col < self.input_arity + map_len {
870 *col = new_input_arity + (*col - self.input_arity);
871 } else {
872 *col = remap(*col);
873 }
874 };
875 for expr in map.iter_mut() {
876 expr.visit_columns(action);
877 }
878 for pred in filter.iter_mut() {
879 pred.visit_columns(action);
880 }
881 for proj in project.iter_mut() {
882 action(proj);
883 assert!(*proj < new_input_arity + map.len());
884 }
885 *self = Self::new(new_input_arity)
886 .map(map)
887 .filter(filter)
888 .project(project)
889 }
890}
891
892// Optimization routines.
893impl MapFilterProject {
894 /// Optimize the internal expression evaluation order.
895 ///
896 /// This method performs several optimizations that are meant to streamline
897 /// the execution of the `MapFilterProject` instance, but not to alter its
898 /// semantics. This includes extracting expressions that are used multiple
899 /// times, inlining those that are not, and removing expressions that are
900 /// unreferenced.
901 ///
902 /// This method will inline all temporal expressions, and remove any columns
903 /// that are not demanded by the output, which should transform any temporal
904 /// filters to a state where the temporal expressions exist only in the list
905 /// of predicates.
906 ///
907 /// # Example
908 ///
909 /// This example demonstrates how the re-use of one expression, converting
910 /// column 1 from a string to an integer, can be extracted and the results
911 /// shared among the two uses. This example is used for each of the steps
912 /// along the optimization path.
913 ///
914 /// ```rust
915 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
916 /// // Demonstrate extraction of common expressions (here: parsing strings).
917 /// let mut map_filter_project = MapFilterProject::new(5)
918 /// .map(vec![
919 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
920 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
921 /// ])
922 /// .project(vec![3,4,5,6]);
923 ///
924 /// let mut expected_optimized = MapFilterProject::new(5)
925 /// .map(vec![
926 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
927 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
928 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
929 /// ])
930 /// .project(vec![3,4,6,7]);
931 ///
932 /// // Optimize the expression.
933 /// map_filter_project.optimize();
934 ///
935 /// assert_eq!(
936 /// map_filter_project,
937 /// expected_optimized,
938 /// );
939 /// ```
940 pub fn optimize(&mut self) {
941 // Track sizes and iterate as long as they decrease.
942 let mut prev_size = None;
943 let mut self_size = usize::max_value();
944 // Continue as long as strict improvements occur.
945 while prev_size.map(|p| self_size < p).unwrap_or(true) {
946 // Lock in current size.
947 prev_size = Some(self_size);
948
949 // We have an annoying pattern of mapping literals that already exist as columns (by filters).
950 // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
951 // and then replace the mapped expression by a column reference.
952 //
953 // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
954 // the first place. The tell-tale that we see when we fix is a diff that look likes
955 //
956 // - Project (#0, #2)
957 // - Filter (#1 = 1)
958 // - Map (1)
959 // - Get l0
960 // + Filter (#1 = 1)
961 // + Get l0
962 //
963 for (index, expr) in self.expressions.iter_mut().enumerate() {
964 // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
965 for (_, predicate) in self.predicates.iter() {
966 if let MirScalarExpr::CallBinary {
967 func: crate::BinaryFunc::Eq(_),
968 expr1,
969 expr2,
970 } = predicate
971 {
972 if let MirScalarExpr::Column(c, name) = &**expr1 {
973 if *c < index + self.input_arity && &**expr2 == expr {
974 *expr = MirScalarExpr::Column(*c, name.clone());
975 }
976 }
977 if let MirScalarExpr::Column(c, name) = &**expr2 {
978 if *c < index + self.input_arity && &**expr1 == expr {
979 *expr = MirScalarExpr::Column(*c, name.clone());
980 }
981 }
982 }
983 }
984 }
985
986 // Optimization memoizes individual `ScalarExpr` expressions that
987 // are sure to be evaluated, canonicalizes references to the first
988 // occurrence of each, inlines expressions that have a reference
989 // count of one, and then removes any expressions that are not
990 // referenced.
991 self.memoize_expressions();
992 self.predicates.sort();
993 self.predicates.dedup();
994 self.inline_expressions();
995 self.remove_undemanded();
996
997 // Re-build `self` from parts to restore evaluation order invariants.
998 let (map, filter, project) = self.as_map_filter_project();
999 *self = Self::new(self.input_arity)
1000 .map(map)
1001 .filter(filter)
1002 .project(project);
1003
1004 self_size = self.size();
1005 }
1006 }
1007
1008 /// Total expression sizes across all expressions.
1009 pub fn size(&self) -> usize {
1010 self.expressions.iter().map(|e| e.size()).sum::<usize>()
1011 + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1012 }
1013
1014 /// Place each certainly evaluated expression in its own column.
1015 ///
1016 /// This method places each non-trivial, certainly evaluated expression
1017 /// in its own column, and deduplicates them so that all references to
1018 /// the same expression reference the same column.
1019 ///
1020 /// This transformation is restricted to expressions we are certain will
1021 /// be evaluated, which does not include expressions in `if` statements.
1022 ///
1023 /// # Example
1024 ///
1025 /// This example demonstrates how memoization notices `MirScalarExpr`s
1026 /// that are used multiple times, and ensures that each are extracted
1027 /// into columns and then referenced by column. This pass does not try
1028 /// to minimize the occurrences of column references, which will happen
1029 /// in inlining.
1030 ///
1031 /// ```rust
1032 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1033 /// // Demonstrate extraction of common expressions (here: parsing strings).
1034 /// let mut map_filter_project = MapFilterProject::new(5)
1035 /// .map(vec![
1036 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(1).call_unary(func::CastStringToInt64), func::AddInt64),
1037 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1038 /// ])
1039 /// .project(vec![3,4,5,6]);
1040 ///
1041 /// let mut expected_optimized = MapFilterProject::new(5)
1042 /// .map(vec![
1043 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1044 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1045 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1046 /// MirScalarExpr::column(7),
1047 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1048 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1049 /// MirScalarExpr::column(10),
1050 /// ])
1051 /// .project(vec![3,4,8,11]);
1052 ///
1053 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1054 /// map_filter_project.memoize_expressions();
1055 ///
1056 /// assert_eq!(
1057 /// map_filter_project,
1058 /// expected_optimized,
1059 /// );
1060 /// ```
1061 ///
1062 /// Expressions may not be memoized if they are not certain to be evaluated,
1063 /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1064 ///
1065 /// ```rust
1066 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1067 /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1068 /// // the non-extraction of common expressions guarded by conditions.
1069 /// let mut map_filter_project = MapFilterProject::new(2)
1070 /// .map(vec![
1071 /// MirScalarExpr::If {
1072 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1073 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1074 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1075 /// },
1076 /// MirScalarExpr::If {
1077 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt)),
1078 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1079 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1080 /// },
1081 /// ]);
1082 ///
1083 /// let mut expected_optimized = MapFilterProject::new(2)
1084 /// .map(vec![
1085 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::Lt),
1086 /// MirScalarExpr::If {
1087 /// cond: Box::new(MirScalarExpr::column(2)),
1088 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1089 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1090 /// },
1091 /// MirScalarExpr::column(3),
1092 /// MirScalarExpr::If {
1093 /// cond: Box::new(MirScalarExpr::column(2)),
1094 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), func::DivInt64)),
1095 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), func::DivInt64)),
1096 /// },
1097 /// MirScalarExpr::column(5),
1098 /// ])
1099 /// .project(vec![0,1,4,6]);
1100 ///
1101 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1102 /// map_filter_project.memoize_expressions();
1103 ///
1104 /// assert_eq!(
1105 /// map_filter_project,
1106 /// expected_optimized,
1107 /// );
1108 /// ```
1109 pub fn memoize_expressions(&mut self) {
1110 // Record the mapping from starting column references to new column
1111 // references.
1112 let mut remaps = BTreeMap::new();
1113 for index in 0..self.input_arity {
1114 remaps.insert(index, index);
1115 }
1116 let mut new_expressions = Vec::new();
1117
1118 // We follow the same order as for evaluation, to ensure that all
1119 // column references exist in time for their evaluation. We could
1120 // prioritize predicates, but we would need to be careful to chase
1121 // down column references to expressions and memoize those as well.
1122 let mut expression = 0;
1123 for (support, predicate) in self.predicates.iter_mut() {
1124 while self.input_arity + expression < *support {
1125 self.expressions[expression].permute_map(&remaps);
1126 memoize_expr(
1127 &mut self.expressions[expression],
1128 &mut new_expressions,
1129 self.input_arity,
1130 );
1131 remaps.insert(
1132 self.input_arity + expression,
1133 self.input_arity + new_expressions.len(),
1134 );
1135 new_expressions.push(self.expressions[expression].clone());
1136 expression += 1;
1137 }
1138 predicate.permute_map(&remaps);
1139 memoize_expr(predicate, &mut new_expressions, self.input_arity);
1140 }
1141 while expression < self.expressions.len() {
1142 self.expressions[expression].permute_map(&remaps);
1143 memoize_expr(
1144 &mut self.expressions[expression],
1145 &mut new_expressions,
1146 self.input_arity,
1147 );
1148 remaps.insert(
1149 self.input_arity + expression,
1150 self.input_arity + new_expressions.len(),
1151 );
1152 new_expressions.push(self.expressions[expression].clone());
1153 expression += 1;
1154 }
1155
1156 self.expressions = new_expressions;
1157 for proj in self.projection.iter_mut() {
1158 *proj = remaps[proj];
1159 }
1160
1161 // Restore predicate order invariants.
1162 for (pos, pred) in self.predicates.iter_mut() {
1163 *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1164 }
1165 }
1166
1167 /// This method inlines expressions with a single use.
1168 ///
1169 /// This method only inlines expressions; it does not delete expressions
1170 /// that are no longer referenced. The `remove_undemanded()` method does
1171 /// that, and should likely be used after this method.
1172 ///
1173 /// Inlining replaces column references when the referred-to item is either
1174 /// another column reference, or the only referrer of its referent. This
1175 /// is most common after memoization has atomized all expressions to seek
1176 /// out re-use: inlining re-assembles expressions that were not helpfully
1177 /// shared with other expressions.
1178 ///
1179 /// # Example
1180 ///
1181 /// In this example, we see that with only a single reference to columns
1182 /// 0 and 2, their parsing can each be inlined. Similarly, column references
1183 /// can be cleaned up among expressions, and in the final projection.
1184 ///
1185 /// Also notice the remaining expressions, which can be cleaned up in a later
1186 /// pass (the `remove_undemanded` method).
1187 ///
1188 /// ```rust
1189 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1190 /// // Use the output from first `memoize_expression` example.
1191 /// let mut map_filter_project = MapFilterProject::new(5)
1192 /// .map(vec![
1193 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1194 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1195 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), func::AddInt64),
1196 /// MirScalarExpr::column(7),
1197 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1198 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), func::AddInt64),
1199 /// MirScalarExpr::column(10),
1200 /// ])
1201 /// .project(vec![3,4,8,11]);
1202 ///
1203 /// let mut expected_optimized = MapFilterProject::new(5)
1204 /// .map(vec![
1205 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1206 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1207 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1208 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1209 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1210 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1211 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1212 /// ])
1213 /// .project(vec![3,4,8,11]);
1214 ///
1215 /// // Inline expressions that are referenced only once.
1216 /// map_filter_project.inline_expressions();
1217 ///
1218 /// assert_eq!(
1219 /// map_filter_project,
1220 /// expected_optimized,
1221 /// );
1222 /// ```
1223 pub fn inline_expressions(&mut self) {
1224 // Local copy of input_arity to avoid borrowing `self` in closures.
1225 let input_arity = self.input_arity;
1226 // Reference counts track the number of places that a reference occurs.
1227 let mut reference_count = vec![0; input_arity + self.expressions.len()];
1228 // Increment reference counts for each use
1229 for expr in self.expressions.iter() {
1230 expr.visit_pre(|e| {
1231 if let MirScalarExpr::Column(i, _name) = e {
1232 reference_count[*i] += 1;
1233 }
1234 });
1235 }
1236 for (_, pred) in self.predicates.iter() {
1237 pred.visit_pre(|e| {
1238 if let MirScalarExpr::Column(i, _name) = e {
1239 reference_count[*i] += 1;
1240 }
1241 });
1242 }
1243 for proj in self.projection.iter() {
1244 reference_count[*proj] += 1;
1245 }
1246
1247 // Determine which expressions should be inlined because they reference temporal expressions.
1248 let mut is_temporal = vec![false; input_arity];
1249 for expr in self.expressions.iter() {
1250 // An express may contain a temporal expression, or reference a column containing such.
1251 is_temporal.push(
1252 expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1253 );
1254 }
1255
1256 // Inline only those columns that 1. are expressions not inputs, and
1257 // 2a. are column references or literals or 2b. have a refcount of 1,
1258 // or 2c. reference temporal expressions (which cannot be evaluated).
1259 let mut should_inline = vec![false; reference_count.len()];
1260 for i in (input_arity..reference_count.len()).rev() {
1261 if let MirScalarExpr::Column(c, _) = self.expressions[i - input_arity] {
1262 should_inline[i] = true;
1263 // The reference count of the referenced column should be
1264 // incremented with the number of references
1265 // `self.expressions[i - input_arity]` has.
1266 // Subtract 1 because `self.expressions[i - input_arity]` is
1267 // itself a reference.
1268 reference_count[c] += reference_count[i] - 1;
1269 } else {
1270 should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1271 }
1272 }
1273 // Inline expressions per `should_inline`.
1274 self.perform_inlining(should_inline);
1275 // We can only inline column references in `self.projection`, but we should.
1276 for proj in self.projection.iter_mut() {
1277 if *proj >= self.input_arity {
1278 if let MirScalarExpr::Column(i, _) = self.expressions[*proj - self.input_arity] {
1279 // TODO(mgree) !!! propagate name information to projection
1280 *proj = i;
1281 }
1282 }
1283 }
1284 }
1285
1286 /// Inlines those expressions that are indicated by should_inline.
1287 /// See `inline_expressions` for usage.
1288 pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1289 for index in 0..self.expressions.len() {
1290 let (prior, expr) = self.expressions.split_at_mut(index);
1291 #[allow(deprecated)]
1292 expr[0].visit_mut_post_nolimit(&mut |e| {
1293 if let MirScalarExpr::Column(i, _name) = e {
1294 if should_inline[*i] {
1295 *e = prior[*i - self.input_arity].clone();
1296 }
1297 }
1298 });
1299 }
1300 for (_index, pred) in self.predicates.iter_mut() {
1301 let expressions = &self.expressions;
1302 #[allow(deprecated)]
1303 pred.visit_mut_post_nolimit(&mut |e| {
1304 if let MirScalarExpr::Column(i, _name) = e {
1305 if should_inline[*i] {
1306 *e = expressions[*i - self.input_arity].clone();
1307 }
1308 }
1309 });
1310 }
1311 }
1312
1313 /// Removes unused expressions from `self.expressions`.
1314 ///
1315 /// Expressions are "used" if they are relied upon by any output columns
1316 /// or any predicates, even transitively. Any expressions that are not
1317 /// relied upon in this way can be discarded.
1318 ///
1319 /// # Example
1320 ///
1321 /// ```rust
1322 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1323 /// // Use the output from `inline_expression` example.
1324 /// let mut map_filter_project = MapFilterProject::new(5)
1325 /// .map(vec![
1326 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64),
1327 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1328 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1329 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(6), func::AddInt64),
1330 /// MirScalarExpr::column(2).call_unary(func::CastStringToInt64),
1331 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1332 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1333 /// ])
1334 /// .project(vec![3,4,8,11]);
1335 ///
1336 /// let mut expected_optimized = MapFilterProject::new(5)
1337 /// .map(vec![
1338 /// MirScalarExpr::column(1).call_unary(func::CastStringToInt64),
1339 /// MirScalarExpr::column(0).call_unary(func::CastStringToInt64).call_binary(MirScalarExpr::column(5), func::AddInt64),
1340 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(func::CastStringToInt64), func::AddInt64),
1341 /// ])
1342 /// .project(vec![3,4,6,7]);
1343 ///
1344 /// // Remove undemanded expressions, streamlining the work done..
1345 /// map_filter_project.remove_undemanded();
1346 ///
1347 /// assert_eq!(
1348 /// map_filter_project,
1349 /// expected_optimized,
1350 /// );
1351 /// ```
1352 pub fn remove_undemanded(&mut self) {
1353 // Determine the demanded expressions to remove irrelevant ones.
1354 let mut demand = BTreeSet::new();
1355 for (_index, pred) in self.predicates.iter() {
1356 demand.extend(pred.support());
1357 }
1358 // Start from the output columns as presumed demanded.
1359 // If this is not the case, the caller should project some away.
1360 demand.extend(self.projection.iter().cloned());
1361 // Proceed in *reverse* order, as expressions may depend on other
1362 // expressions that precede them.
1363 for index in (0..self.expressions.len()).rev() {
1364 if demand.contains(&(self.input_arity + index)) {
1365 demand.extend(self.expressions[index].support());
1366 }
1367 }
1368
1369 // Maintain a map from initial column identifiers to locations
1370 // once we have removed undemanded expressions.
1371 let mut remap = BTreeMap::new();
1372 // This map only needs to map elements of `demand` to a new location,
1373 // but the logic is easier if we include all input columns (as the
1374 // new position is then determined by the size of the map).
1375 for index in 0..self.input_arity {
1376 remap.insert(index, index);
1377 }
1378 // Retain demanded expressions, and record their new locations.
1379 let mut new_expressions = Vec::new();
1380 for (index, expr) in self.expressions.drain(..).enumerate() {
1381 if demand.contains(&(index + self.input_arity)) {
1382 remap.insert(index + self.input_arity, remap.len());
1383 new_expressions.push(expr);
1384 }
1385 }
1386 self.expressions = new_expressions;
1387
1388 // Update column identifiers; rebuild `Self` to re-establish any invariants.
1389 // We mirror `self.permute(&remap)` but we specifically want to remap columns
1390 // that are produced by `self.expressions` after the input columns.
1391 let (expressions, predicates, projection) = self.as_map_filter_project();
1392 *self = Self::new(self.input_arity)
1393 .map(expressions.into_iter().map(|mut e| {
1394 e.permute_map(&remap);
1395 e
1396 }))
1397 .filter(predicates.into_iter().map(|mut p| {
1398 p.permute_map(&remap);
1399 p
1400 }))
1401 .project(projection.into_iter().map(|c| remap[&c]));
1402 }
1403}
1404
1405// TODO: move this elsewhere?
1406/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1407///
1408/// A part of `expr` that is memoized is replaced by a reference to column
1409/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1410/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1411/// refers to.
1412pub fn memoize_expr(
1413 expr: &mut MirScalarExpr,
1414 memoized_parts: &mut Vec<MirScalarExpr>,
1415 input_arity: usize,
1416) {
1417 #[allow(deprecated)]
1418 expr.visit_mut_pre_post_nolimit(
1419 &mut |e| {
1420 // We should not eagerly memoize `if` branches that might not be taken.
1421 // TODO: Memoize expressions in the intersection of `then` and `els`.
1422 if let MirScalarExpr::If { cond, .. } = e {
1423 return Some(vec![cond]);
1424 }
1425
1426 // We should not eagerly memoize `COALESCE` expressions after the first,
1427 // as they are only meant to be evaluated if the preceding expressions
1428 // evaluate to NULL. We could memoize any preceding by expressions that
1429 // are certain not to error.
1430 if let MirScalarExpr::CallVariadic {
1431 func: crate::VariadicFunc::Coalesce,
1432 exprs,
1433 } = e
1434 {
1435 return Some(exprs.iter_mut().take(1).collect());
1436 }
1437
1438 // We should not deconstruct temporal filters, because `MfpPlan::create_from` expects
1439 // those to be in a specific form. However, we _should_ attend to the expression that is
1440 // on the opposite side of mz_now(), because it might be a complex expression in itself,
1441 // and is ok to deconstruct.
1442 if let Some((_func, other_side)) = e.as_mut_temporal_filter().ok() {
1443 return Some(vec![other_side]);
1444 }
1445
1446 None
1447 },
1448 &mut |e| {
1449 match e {
1450 MirScalarExpr::Literal(_, _) => {
1451 // Literals do not need to be memoized.
1452 }
1453 MirScalarExpr::Column(col, _) => {
1454 // Column references do not need to be memoized, but may need to be
1455 // updated if they reference a column reference themselves.
1456 if *col > input_arity {
1457 if let MirScalarExpr::Column(col2, _) = memoized_parts[*col - input_arity] {
1458 // We do _not_ propagate column names, since mis-associating names and column
1459 // references will be very confusing (and possibly bug-inducing).
1460 *col = col2;
1461 }
1462 }
1463 }
1464 _ => {
1465 // TODO: OOO (Optimizer Optimization Opportunity):
1466 // we are quadratic in expression size because of this .iter().position
1467 if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1468 // Any complex expression that already exists as a prior column can
1469 // be replaced by a reference to that column.
1470 *e = MirScalarExpr::column(input_arity + position);
1471 } else {
1472 // A complex expression that does not exist should be memoized, and
1473 // replaced by a reference to the column.
1474 memoized_parts.push(std::mem::replace(
1475 e,
1476 MirScalarExpr::column(input_arity + memoized_parts.len()),
1477 ));
1478 }
1479 }
1480 }
1481 },
1482 )
1483}
1484
1485pub mod util {
1486 use std::collections::BTreeMap;
1487
1488 use crate::MirScalarExpr;
1489
1490 #[allow(dead_code)]
1491 /// A triple of actions that map from rows to (key, val) pairs and back again.
1492 struct KeyValRowMapping {
1493 /// Expressions to apply to a row to produce key datums.
1494 to_key: Vec<MirScalarExpr>,
1495 /// Columns to project from a row to produce residual value datums.
1496 to_val: Vec<usize>,
1497 /// Columns to project from the concatenation of key and value to reconstruct the row.
1498 to_row: Vec<usize>,
1499 }
1500
1501 /// Derive supporting logic to support transforming rows to (key, val) pairs,
1502 /// and back again.
1503 ///
1504 /// We are given as input a list of key expressions and an input arity, and the
1505 /// requirement the produced key should be the application of the key expressions.
1506 /// To produce the `val` output, we will identify those input columns not found in
1507 /// the key expressions, and name all other columns.
1508 /// To reconstitute the original row, we identify the sequence of columns from the
1509 /// concatenation of key and val which would reconstruct the original row.
1510 ///
1511 /// The output is a pair of column sequences, the first used to reconstruct a row
1512 /// from the concatenation of key and value, and the second to identify the columns
1513 /// of a row that should become the value associated with its key.
1514 ///
1515 /// The permutations and thinning expressions generated here will be tracked in
1516 /// `dataflow::plan::AvailableCollections`; see the
1517 /// documentation there for more details.
1518 pub fn permutation_for_arrangement(
1519 key: &[MirScalarExpr],
1520 unthinned_arity: usize,
1521 ) -> (Vec<usize>, Vec<usize>) {
1522 let columns_in_key: BTreeMap<_, _> = key
1523 .iter()
1524 .enumerate()
1525 .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1526 .collect();
1527 let mut input_cursor = key.len();
1528 let permutation = (0..unthinned_arity)
1529 .map(|c| {
1530 if let Some(c) = columns_in_key.get(&c) {
1531 // Column is in key (and thus gone from the value
1532 // of the thinned representation)
1533 *c
1534 } else {
1535 // Column remains in value of the thinned representation
1536 input_cursor += 1;
1537 input_cursor - 1
1538 }
1539 })
1540 .collect();
1541 let thinning = (0..unthinned_arity)
1542 .filter(|c| !columns_in_key.contains_key(c))
1543 .collect();
1544 (permutation, thinning)
1545 }
1546
1547 /// Given the permutations (see [`permutation_for_arrangement`] and
1548 /// (`dataflow::plan::AvailableCollections`) corresponding to two
1549 /// collections with the same key arity,
1550 /// computes the permutation for the result of joining them.
1551 pub fn join_permutations(
1552 key_arity: usize,
1553 stream_permutation: Vec<usize>,
1554 thinned_stream_arity: usize,
1555 lookup_permutation: Vec<usize>,
1556 ) -> BTreeMap<usize, usize> {
1557 let stream_arity = stream_permutation.len();
1558 let lookup_arity = lookup_permutation.len();
1559
1560 (0..stream_arity + lookup_arity)
1561 .map(|i| {
1562 let location = if i < stream_arity {
1563 stream_permutation[i]
1564 } else {
1565 let location_in_lookup = lookup_permutation[i - stream_arity];
1566 if location_in_lookup < key_arity {
1567 location_in_lookup
1568 } else {
1569 location_in_lookup + thinned_stream_arity
1570 }
1571 };
1572 (i, location)
1573 })
1574 .collect()
1575 }
1576}
1577
1578pub mod plan {
1579 use std::iter;
1580
1581 use mz_repr::{Datum, Diff, Row, RowArena};
1582 use serde::{Deserialize, Serialize};
1583
1584 use crate::{BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, UnaryFunc, func};
1585
1586 /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1587 #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1588 pub struct SafeMfpPlan {
1589 pub(crate) mfp: MapFilterProject,
1590 }
1591
1592 impl SafeMfpPlan {
1593 /// Remaps references to input columns according to `remap`.
1594 ///
1595 /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1596 pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1597 where
1598 F: Fn(usize) -> usize,
1599 {
1600 self.mfp.permute_fn(remap, new_arity);
1601 }
1602 /// Evaluates the linear operator on a supplied list of datums.
1603 ///
1604 /// The arguments are the initial datums associated with the row,
1605 /// and an appropriately lifetimed arena for temporary allocations
1606 /// needed by scalar evaluation.
1607 ///
1608 /// An `Ok` result will either be `None` if any predicate did not
1609 /// evaluate to `Datum::True`, or the values of the columns listed
1610 /// by `self.projection` if all predicates passed. If an error
1611 /// occurs in the evaluation it is returned as an `Err` variant.
1612 /// As the evaluation exits early with failed predicates, it may
1613 /// miss some errors that would occur later in evaluation.
1614 ///
1615 /// The `row` is not cleared first, but emptied if the function
1616 /// returns `Ok(Some(row)).
1617 #[inline(always)]
1618 pub fn evaluate_into<'a, 'row>(
1619 &'a self,
1620 datums: &mut Vec<Datum<'a>>,
1621 arena: &'a RowArena,
1622 row_buf: &'row mut Row,
1623 ) -> Result<Option<&'row Row>, EvalError> {
1624 let passed_predicates = self.evaluate_inner(datums, arena)?;
1625 if !passed_predicates {
1626 Ok(None)
1627 } else {
1628 row_buf
1629 .packer()
1630 .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1631 Ok(Some(row_buf))
1632 }
1633 }
1634
1635 /// A version of `evaluate` which produces an iterator over `Datum`
1636 /// as output.
1637 ///
1638 /// This version can be useful when one wants to capture the resulting
1639 /// datums without packing and then unpacking a row.
1640 #[inline(always)]
1641 pub fn evaluate_iter<'b, 'a: 'b>(
1642 &'a self,
1643 datums: &'b mut Vec<Datum<'a>>,
1644 arena: &'a RowArena,
1645 ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1646 let passed_predicates = self.evaluate_inner(datums, arena)?;
1647 if !passed_predicates {
1648 Ok(None)
1649 } else {
1650 Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1651 }
1652 }
1653
1654 /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1655 ///
1656 /// This does not apply `self.projection`, which is up to the calling method.
1657 pub fn evaluate_inner<'b, 'a: 'b>(
1658 &'a self,
1659 datums: &'b mut Vec<Datum<'a>>,
1660 arena: &'a RowArena,
1661 ) -> Result<bool, EvalError> {
1662 let mut expression = 0;
1663 for (support, predicate) in self.mfp.predicates.iter() {
1664 while self.mfp.input_arity + expression < *support {
1665 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1666 expression += 1;
1667 }
1668 if predicate.eval(&datums[..], arena)? != Datum::True {
1669 return Ok(false);
1670 }
1671 }
1672 while expression < self.mfp.expressions.len() {
1673 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1674 expression += 1;
1675 }
1676 Ok(true)
1677 }
1678
1679 /// Returns true if evaluation could introduce an error on non-error inputs.
1680 pub fn could_error(&self) -> bool {
1681 self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1682 || self.mfp.expressions.iter().any(|e| e.could_error())
1683 }
1684
1685 /// Returns true when `Self` is the identity.
1686 pub fn is_identity(&self) -> bool {
1687 self.mfp.is_identity()
1688 }
1689 }
1690
1691 impl std::ops::Deref for SafeMfpPlan {
1692 type Target = MapFilterProject;
1693 fn deref(&self) -> &Self::Target {
1694 &self.mfp
1695 }
1696 }
1697
1698 /// Predicates partitioned into temporal and non-temporal.
1699 ///
1700 /// Temporal predicates require some recognition to determine their
1701 /// structure, and it is best to do that once and re-use the results.
1702 ///
1703 /// There are restrictions on the temporal predicates we currently support.
1704 /// They must directly constrain `MzNow` from below or above,
1705 /// by expressions that do not themselves contain `MzNow`.
1706 /// Conjunctions of such constraints are also ok.
1707 #[derive(Clone, Debug, PartialEq)]
1708 pub struct MfpPlan {
1709 /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1710 pub(crate) mfp: SafeMfpPlan,
1711 /// Expressions that when evaluated lower-bound `MzNow`.
1712 pub(crate) lower_bounds: Vec<MirScalarExpr>,
1713 /// Expressions that when evaluated upper-bound `MzNow`.
1714 pub(crate) upper_bounds: Vec<MirScalarExpr>,
1715 }
1716
1717 impl MfpPlan {
1718 /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1719 ///
1720 /// The first returned list is of predicates that do not contain `mz_now`.
1721 /// The second and third returned lists contain expressions that, once evaluated, lower
1722 /// and upper bound the validity interval of a record, respectively. These second two
1723 /// lists are populated only by binary expressions of the form
1724 /// ```ignore
1725 /// mz_now cmp_op expr
1726 /// ```
1727 /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1728 ///
1729 /// If any unsupported expression is found, for example one that uses `mz_now`
1730 /// in an unsupported position, an error is returned.
1731 pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1732 let mut lower_bounds = Vec::new();
1733 let mut upper_bounds = Vec::new();
1734
1735 let mut temporal = Vec::new();
1736
1737 // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1738 mfp.optimize();
1739
1740 mfp.predicates.retain(|(_position, predicate)| {
1741 if predicate.contains_temporal() {
1742 temporal.push(predicate.clone());
1743 false
1744 } else {
1745 true
1746 }
1747 });
1748
1749 for mut predicate in temporal.into_iter() {
1750 let (func, expr2) = predicate.as_mut_temporal_filter()?;
1751 let expr2 = expr2.clone();
1752
1753 // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1754 match func {
1755 BinaryFunc::Eq(_) => {
1756 lower_bounds.push(expr2.clone());
1757 upper_bounds.push(
1758 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1759 );
1760 }
1761 BinaryFunc::Lt(_) => {
1762 upper_bounds.push(expr2.clone());
1763 }
1764 BinaryFunc::Lte(_) => {
1765 upper_bounds.push(
1766 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1767 );
1768 }
1769 BinaryFunc::Gt(_) => {
1770 lower_bounds.push(
1771 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1772 );
1773 }
1774 BinaryFunc::Gte(_) => {
1775 lower_bounds.push(expr2.clone());
1776 }
1777 _ => {
1778 return Err(format!("Unsupported binary temporal operation: {:?}", func));
1779 }
1780 }
1781 }
1782
1783 Ok(Self {
1784 mfp: SafeMfpPlan { mfp },
1785 lower_bounds,
1786 upper_bounds,
1787 })
1788 }
1789
1790 /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1791 pub fn is_identity(&self) -> bool {
1792 self.mfp.mfp.is_identity()
1793 && self.lower_bounds.is_empty()
1794 && self.upper_bounds.is_empty()
1795 }
1796
1797 /// Returns `self`, and leaves behind an identity operator that acts on its output.
1798 pub fn take(&mut self) -> Self {
1799 let mut identity = Self {
1800 mfp: SafeMfpPlan {
1801 mfp: MapFilterProject::new(self.mfp.projection.len()),
1802 },
1803 lower_bounds: Default::default(),
1804 upper_bounds: Default::default(),
1805 };
1806 std::mem::swap(self, &mut identity);
1807 identity
1808 }
1809
1810 /// Attempt to convert self into a non-temporal MapFilterProject plan.
1811 ///
1812 /// If that is not possible, the original instance is returned as an error.
1813 #[allow(clippy::result_large_err)]
1814 pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1815 if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1816 Ok(self.mfp)
1817 } else {
1818 Err(self)
1819 }
1820 }
1821
1822 /// Returns an iterator over mutable references to all non-temporal
1823 /// scalar expressions in the plan.
1824 ///
1825 /// The order of iteration is unspecified.
1826 pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1827 iter::empty()
1828 .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1829 .chain(&mut self.mfp.mfp.expressions)
1830 .chain(&mut self.lower_bounds)
1831 .chain(&mut self.upper_bounds)
1832 }
1833
1834 /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1835 ///
1836 /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1837 /// or an evaluation error. If `self contains temporal predicates, the results can be times
1838 /// that are greater than the input `time`, and may contain negated `diff` values.
1839 ///
1840 /// The `row_builder` is not cleared first, but emptied if the function
1841 /// returns an iterator with any `Ok(_)` element.
1842 pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1843 &'a self,
1844 datums: &'b mut Vec<Datum<'a>>,
1845 arena: &'a RowArena,
1846 time: mz_repr::Timestamp,
1847 diff: Diff,
1848 valid_time: V,
1849 row_builder: &mut Row,
1850 ) -> impl Iterator<
1851 Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1852 > + use<E, V> {
1853 match self.mfp.evaluate_inner(datums, arena) {
1854 Err(e) => {
1855 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1856 }
1857 Ok(true) => {}
1858 Ok(false) => {
1859 return None.into_iter().chain(None);
1860 }
1861 }
1862
1863 // Lower and upper bounds.
1864 let mut lower_bound = time;
1865 let mut upper_bound = None;
1866
1867 // Track whether we have seen a null in either bound, as this should
1868 // prevent the record from being produced at any time.
1869 let mut null_eval = false;
1870
1871 // Advance our lower bound to be at least the result of any lower bound
1872 // expressions.
1873 for l in self.lower_bounds.iter() {
1874 match l.eval(datums, arena) {
1875 Err(e) => {
1876 return Some(Err((e.into(), time, diff)))
1877 .into_iter()
1878 .chain(None.into_iter());
1879 }
1880 Ok(Datum::MzTimestamp(d)) => {
1881 lower_bound = lower_bound.max(d);
1882 }
1883 Ok(Datum::Null) => {
1884 null_eval = true;
1885 }
1886 x => {
1887 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1888 }
1889 }
1890 }
1891
1892 // If the lower bound exceeds our `until` frontier, it should not appear in the output.
1893 if !valid_time(&lower_bound) {
1894 return None.into_iter().chain(None);
1895 }
1896
1897 // If there are any upper bounds, determine the minimum upper bound.
1898 for u in self.upper_bounds.iter() {
1899 // We can cease as soon as the lower and upper bounds match,
1900 // as the update will certainly not be produced in that case.
1901 if upper_bound != Some(lower_bound) {
1902 match u.eval(datums, arena) {
1903 Err(e) => {
1904 return Some(Err((e.into(), time, diff)))
1905 .into_iter()
1906 .chain(None.into_iter());
1907 }
1908 Ok(Datum::MzTimestamp(d)) => {
1909 if let Some(upper) = upper_bound {
1910 upper_bound = Some(upper.min(d));
1911 } else {
1912 upper_bound = Some(d);
1913 };
1914 // Force the upper bound to be at least the lower
1915 // bound. The `is_some()` test should always be true
1916 // due to the above block, but maintain it here in
1917 // case that changes. It's hopefully optimized away.
1918 if upper_bound.is_some() && upper_bound < Some(lower_bound) {
1919 upper_bound = Some(lower_bound);
1920 }
1921 }
1922 Ok(Datum::Null) => {
1923 null_eval = true;
1924 }
1925 x => {
1926 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
1927 }
1928 }
1929 }
1930 }
1931
1932 // If the upper bound exceeds our `until` frontier, it should not appear in the output.
1933 if let Some(upper) = &mut upper_bound {
1934 if !valid_time(upper) {
1935 upper_bound = None;
1936 }
1937 }
1938
1939 // Produce an output only if the upper bound exceeds the lower bound,
1940 // and if we did not encounter a `null` in our evaluation.
1941 if Some(lower_bound) != upper_bound && !null_eval {
1942 row_builder
1943 .packer()
1944 .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
1945 let upper_opt =
1946 upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
1947 let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
1948 lower.into_iter().chain(upper_opt)
1949 } else {
1950 None.into_iter().chain(None)
1951 }
1952 }
1953
1954 /// Returns true if evaluation could introduce an error on non-error inputs.
1955 pub fn could_error(&self) -> bool {
1956 self.mfp.could_error()
1957 || self.lower_bounds.iter().any(|e| e.could_error())
1958 || self.upper_bounds.iter().any(|e| e.could_error())
1959 }
1960
1961 /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
1962 ///
1963 /// At the moment, this is only true if it projects away all columns and applies no filters,
1964 /// but it could be extended to plans that produce literals independent of the input.
1965 pub fn ignores_input(&self) -> bool {
1966 self.lower_bounds.is_empty()
1967 && self.upper_bounds.is_empty()
1968 && self.mfp.mfp.projection.is_empty()
1969 && self.mfp.mfp.predicates.is_empty()
1970 }
1971 }
1972}