mz_expr/linear.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::Display;
11
12use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
13use mz_repr::{Datum, Row};
14use proptest::prelude::*;
15use serde::{Deserialize, Serialize};
16
17use crate::linear::proto_map_filter_project::ProtoPredicate;
18use crate::visit::Visit;
19use crate::{MirRelationExpr, MirScalarExpr};
20
21include!(concat!(env!("OUT_DIR"), "/mz_expr.linear.rs"));
22
23/// A compound operator that can be applied row-by-row.
24///
25/// This operator integrates the map, filter, and project operators.
26/// It applies a sequences of map expressions, which are allowed to
27/// refer to previous expressions, interleaved with predicates which
28/// must be satisfied for an output to be produced. If all predicates
29/// evaluate to `Datum::True` the data at the identified columns are
30/// collected and produced as output in a packed `Row`.
31///
32/// This operator is a "builder" and its contents may contain expressions
33/// that are not yet executable. For example, it may contain temporal
34/// expressions in `self.expressions`, even though this is not something
35/// we can directly evaluate. The plan creation methods will defensively
36/// ensure that the right thing happens.
37#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd)]
38pub struct MapFilterProject {
39 /// A sequence of expressions that should be appended to the row.
40 ///
41 /// Many of these expressions may not be produced in the output,
42 /// and may only be present as common subexpressions.
43 pub expressions: Vec<MirScalarExpr>,
44 /// Expressions that must evaluate to `Datum::True` for the output
45 /// row to be produced.
46 ///
47 /// Each entry is prepended with a column identifier indicating
48 /// the column *before* which the predicate should first be applied.
49 /// Most commonly this would be one plus the largest column identifier
50 /// in the predicate's support, but it could be larger to implement
51 /// guarded evaluation of predicates.
52 ///
53 /// This list should be sorted by the first field.
54 pub predicates: Vec<(usize, MirScalarExpr)>,
55 /// A sequence of column identifiers whose data form the output row.
56 pub projection: Vec<usize>,
57 /// The expected number of input columns.
58 ///
59 /// This is needed to ensure correct identification of newly formed
60 /// columns in the output.
61 pub input_arity: usize,
62}
63
64/// Use a custom [`Arbitrary`] implementation to cap the number of internal
65/// [`MirScalarExpr`] referenced by the generated [`MapFilterProject`] instances.
66impl Arbitrary for MapFilterProject {
67 type Parameters = ();
68 type Strategy = BoxedStrategy<Self>;
69
70 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
71 (
72 prop::collection::vec(any::<MirScalarExpr>(), 0..10),
73 prop::collection::vec((any::<usize>(), any::<MirScalarExpr>()), 0..10),
74 prop::collection::vec(any::<usize>(), 0..10),
75 usize::arbitrary(),
76 )
77 .prop_map(
78 |(expressions, predicates, projection, input_arity)| MapFilterProject {
79 expressions,
80 predicates,
81 projection,
82 input_arity,
83 },
84 )
85 .boxed()
86 }
87}
88
89impl RustType<ProtoMapFilterProject> for MapFilterProject {
90 fn into_proto(&self) -> ProtoMapFilterProject {
91 ProtoMapFilterProject {
92 expressions: self.expressions.into_proto(),
93 predicates: self.predicates.into_proto(),
94 projection: self.projection.into_proto(),
95 input_arity: self.input_arity.into_proto(),
96 }
97 }
98
99 fn from_proto(proto: ProtoMapFilterProject) -> Result<Self, TryFromProtoError> {
100 Ok(MapFilterProject {
101 expressions: proto.expressions.into_rust()?,
102 predicates: proto.predicates.into_rust()?,
103 projection: proto.projection.into_rust()?,
104 input_arity: proto.input_arity.into_rust()?,
105 })
106 }
107}
108
109impl RustType<ProtoPredicate> for (usize, MirScalarExpr) {
110 fn into_proto(&self) -> ProtoPredicate {
111 ProtoPredicate {
112 column_to_apply: self.0.into_proto(),
113 predicate: Some(self.1.into_proto()),
114 }
115 }
116
117 fn from_proto(proto: ProtoPredicate) -> Result<Self, TryFromProtoError> {
118 Ok((
119 proto.column_to_apply.into_rust()?,
120 proto
121 .predicate
122 .into_rust_if_some("ProtoPredicate::predicate")?,
123 ))
124 }
125}
126
127impl Display for MapFilterProject {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 writeln!(f, "MapFilterProject(")?;
130 writeln!(f, " expressions:")?;
131 self.expressions
132 .iter()
133 .enumerate()
134 .try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
135 writeln!(f, " predicates:")?;
136 self.predicates
137 .iter()
138 .try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
139 writeln!(f, " projection: {:?}", self.projection)?;
140 writeln!(f, " input_arity: {}", self.input_arity)?;
141 writeln!(f, ")")
142 }
143}
144
145impl MapFilterProject {
146 /// Create a no-op operator for an input of a supplied arity.
147 pub fn new(input_arity: usize) -> Self {
148 Self {
149 expressions: Vec::new(),
150 predicates: Vec::new(),
151 projection: (0..input_arity).collect(),
152 input_arity,
153 }
154 }
155
156 /// Given two mfps, return an mfp that applies one
157 /// followed by the other.
158 /// Note that the arguments are in the opposite order
159 /// from how function composition is usually written in mathematics.
160 pub fn compose(before: Self, after: Self) -> Self {
161 let (m, f, p) = after.into_map_filter_project();
162 before.map(m).filter(f).project(p)
163 }
164
165 /// True if the operator describes the identity transformation.
166 pub fn is_identity(&self) -> bool {
167 self.expressions.is_empty()
168 && self.predicates.is_empty()
169 && self.projection.len() == self.input_arity
170 && self.projection.iter().enumerate().all(|(i, p)| i == *p)
171 }
172
173 /// Retain only the indicated columns in the presented order.
174 pub fn project<I>(mut self, columns: I) -> Self
175 where
176 I: IntoIterator<Item = usize> + std::fmt::Debug,
177 {
178 self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
179 self
180 }
181
182 /// Retain only rows satisfying these predicates.
183 ///
184 /// This method introduces predicates as eagerly as they can be evaluated,
185 /// which may not be desired for predicates that may cause exceptions.
186 /// If fine manipulation is required, the predicates can be added manually.
187 pub fn filter<I>(mut self, predicates: I) -> Self
188 where
189 I: IntoIterator<Item = MirScalarExpr>,
190 {
191 for mut predicate in predicates {
192 // Correct column references.
193 predicate.permute(&self.projection[..]);
194
195 // Validate column references.
196 assert!(
197 predicate
198 .support()
199 .into_iter()
200 .all(|c| c < self.input_arity + self.expressions.len())
201 );
202
203 // Insert predicate as eagerly as it can be evaluated:
204 // just after the largest column in its support is formed.
205 let max_support = predicate
206 .support()
207 .into_iter()
208 .max()
209 .map(|c| c + 1)
210 .unwrap_or(0);
211 self.predicates.push((max_support, predicate))
212 }
213 // Stable sort predicates by position at which they take effect.
214 // We put literal errors at the end as a stop-gap to avoid erroring
215 // before we are able to evaluate any predicates that might prevent it.
216 self.predicates
217 .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
218 self
219 }
220
221 /// Append the result of evaluating expressions to each row.
222 pub fn map<I>(mut self, expressions: I) -> Self
223 where
224 I: IntoIterator<Item = MirScalarExpr>,
225 {
226 for mut expression in expressions {
227 // Correct column references.
228 expression.permute(&self.projection[..]);
229
230 // Validate column references.
231 assert!(
232 expression
233 .support()
234 .into_iter()
235 .all(|c| c < self.input_arity + self.expressions.len())
236 );
237
238 // Introduce expression and produce as output.
239 self.expressions.push(expression);
240 self.projection
241 .push(self.input_arity + self.expressions.len() - 1);
242 }
243
244 self
245 }
246
247 /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
248 pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
249 let predicates = self
250 .predicates
251 .into_iter()
252 .map(|(_pos, predicate)| predicate)
253 .collect();
254 (self.expressions, predicates, self.projection)
255 }
256
257 /// As the arguments to `Map`, `Filter`, and `Project` operators.
258 ///
259 /// In principle, this operator can be implemented as a sequence of
260 /// more elemental operators, likely less efficiently.
261 pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
262 self.clone().into_map_filter_project()
263 }
264
265 /// Determines if a scalar expression must be equal to a literal datum.
266 pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum> {
267 for (_pos, predicate) in self.predicates.iter() {
268 if let MirScalarExpr::CallBinary {
269 func: crate::BinaryFunc::Eq,
270 expr1,
271 expr2,
272 } = predicate
273 {
274 if let Some(Ok(datum1)) = expr1.as_literal() {
275 if &**expr2 == expr {
276 return Some(datum1);
277 }
278 }
279 if let Some(Ok(datum2)) = expr2.as_literal() {
280 if &**expr1 == expr {
281 return Some(datum2);
282 }
283 }
284 }
285 }
286 None
287 }
288
289 /// Determines if a sequence of scalar expressions must be equal to a literal row.
290 ///
291 /// This method returns `None` on an empty `exprs`, which might be surprising, but
292 /// seems to line up with its callers' expectations of that being a non-constraint.
293 /// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
294 /// if they would rather have a literal empty row.
295 pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
296 if exprs.is_empty() {
297 return None;
298 }
299 let mut row = Row::default();
300 let mut packer = row.packer();
301 for expr in exprs {
302 if let Some(literal) = self.literal_constraint(expr) {
303 packer.push(literal);
304 } else {
305 return None;
306 }
307 }
308 Some(row)
309 }
310
311 /// Extracts any MapFilterProject at the root of the expression.
312 ///
313 /// The expression will be modified to extract any maps, filters, and
314 /// projections, which will be returned as `Self`. If there are no maps,
315 /// filters, or projections the method will return an identity operator.
316 ///
317 /// The extracted expressions may contain temporal predicates, and one
318 /// should be careful to apply them blindly.
319 pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
320 // TODO: This could become iterative rather than recursive if
321 // we were able to fuse MFP operators from below, rather than
322 // from above.
323 match expr {
324 MirRelationExpr::Map { input, scalars } => {
325 let (mfp, expr) = Self::extract_from_expression(input);
326 (mfp.map(scalars.iter().cloned()), expr)
327 }
328 MirRelationExpr::Filter { input, predicates } => {
329 let (mfp, expr) = Self::extract_from_expression(input);
330 (mfp.filter(predicates.iter().cloned()), expr)
331 }
332 MirRelationExpr::Project { input, outputs } => {
333 let (mfp, expr) = Self::extract_from_expression(input);
334 (mfp.project(outputs.iter().cloned()), expr)
335 }
336 // TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
337 // this call to `arity()`.
338 x => (Self::new(x.arity()), x),
339 }
340 }
341
342 /// Extracts an error-free MapFilterProject at the root of the expression.
343 ///
344 /// The expression will be modified to extract maps, filters, and projects
345 /// from the root of the expression, which will be returned as `Self`. The
346 /// extraction will halt if a Map or Filter containing a literal error is
347 /// reached. Otherwise, the method will return an identity operator.
348 ///
349 /// This method is meant to be used during optimization, where it is
350 /// necessary to avoid moving around maps and filters with errors.
351 pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
352 match expr {
353 MirRelationExpr::Map { input, scalars }
354 if scalars.iter().all(|s| !s.is_literal_err()) =>
355 {
356 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
357 (mfp.map(scalars.iter().cloned()), expr)
358 }
359 MirRelationExpr::Filter { input, predicates }
360 if predicates.iter().all(|p| !p.is_literal_err()) =>
361 {
362 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
363 (mfp.filter(predicates.iter().cloned()), expr)
364 }
365 MirRelationExpr::Project { input, outputs } => {
366 let (mfp, expr) = Self::extract_non_errors_from_expr(input);
367 (mfp.project(outputs.iter().cloned()), expr)
368 }
369 x => (Self::new(x.arity()), x),
370 }
371 }
372
373 /// Extracts an error-free MapFilterProject at the root of the expression.
374 ///
375 /// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
376 /// mutable reference.
377 pub fn extract_non_errors_from_expr_ref_mut(
378 expr: &mut MirRelationExpr,
379 ) -> (Self, &mut MirRelationExpr) {
380 // This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
381 // superfluous outer if, which works around a borrow-checker issue:
382 // https://github.com/rust-lang/rust/issues/54663
383 if matches!(expr, MirRelationExpr::Map { input: _, scalars } if scalars.iter().all(|s| !s.is_literal_err()))
384 || matches!(expr, MirRelationExpr::Filter { input: _, predicates } if predicates.iter().all(|p| !p.is_literal_err()))
385 || matches!(expr, MirRelationExpr::Project { .. })
386 {
387 match expr {
388 MirRelationExpr::Map { input, scalars }
389 if scalars.iter().all(|s| !s.is_literal_err()) =>
390 {
391 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
392 (mfp.map(scalars.iter().cloned()), expr)
393 }
394 MirRelationExpr::Filter { input, predicates }
395 if predicates.iter().all(|p| !p.is_literal_err()) =>
396 {
397 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
398 (mfp.filter(predicates.iter().cloned()), expr)
399 }
400 MirRelationExpr::Project { input, outputs } => {
401 let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
402 (mfp.project(outputs.iter().cloned()), expr)
403 }
404 _ => unreachable!(),
405 }
406 } else {
407 (Self::new(expr.arity()), expr)
408 }
409 }
410
411 /// Removes an error-free MapFilterProject from the root of the expression.
412 ///
413 /// The expression will be modified to extract maps, filters, and projects
414 /// from the root of the expression, which will be returned as `Self`. The
415 /// extraction will halt if a Map or Filter containing a literal error is
416 /// reached. Otherwise, the method will return an
417 /// identity operator, and the expression will remain unchanged.
418 ///
419 /// This method is meant to be used during optimization, where it is
420 /// necessary to avoid moving around maps and filters with errors.
421 pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
422 match expr {
423 MirRelationExpr::Map { input, scalars }
424 if scalars.iter().all(|s| !s.is_literal_err()) =>
425 {
426 let mfp =
427 Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
428 *expr = input.take_dangerous();
429 mfp
430 }
431 MirRelationExpr::Filter { input, predicates }
432 if predicates.iter().all(|p| !p.is_literal_err()) =>
433 {
434 let mfp = Self::extract_non_errors_from_expr_mut(input)
435 .filter(predicates.iter().cloned());
436 *expr = input.take_dangerous();
437 mfp
438 }
439 MirRelationExpr::Project { input, outputs } => {
440 let mfp =
441 Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
442 *expr = input.take_dangerous();
443 mfp
444 }
445 x => Self::new(x.arity()),
446 }
447 }
448
449 /// Extracts temporal predicates into their own `Self`.
450 ///
451 /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
452 /// though there could be justification for extracting them as well if they are otherwise
453 /// unused.
454 ///
455 /// This separation is valuable when the execution cannot be fused into one operator.
456 pub fn extract_temporal(&mut self) -> Self {
457 // Optimize the expression, as it is only post-optimization that we can be certain
458 // that temporal expressions are restricted to filters. We could relax this in the
459 // future to be only `inline_expressions` and `remove_undemanded`, but optimization
460 // seems to be the best fit at the moment.
461 self.optimize();
462
463 // Assert that we no longer have temporal expressions to evaluate. This should only
464 // occur if the optimization above results with temporal expressions yielded in the
465 // output, which is out of spec for how the type is meant to be used.
466 assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
467
468 // Extract temporal predicates from `self.predicates`.
469 let mut temporal_predicates = Vec::new();
470 self.predicates.retain(|(_position, predicate)| {
471 if predicate.contains_temporal() {
472 temporal_predicates.push(predicate.clone());
473 false
474 } else {
475 true
476 }
477 });
478
479 // Determine extended input columns used by temporal filters.
480 let mut support = BTreeSet::new();
481 for predicate in temporal_predicates.iter() {
482 support.extend(predicate.support());
483 }
484
485 // Discover the locations of these columns after `self.projection`.
486 let old_projection_len = self.projection.len();
487 let mut new_location = BTreeMap::new();
488 for original in support.iter() {
489 if let Some(position) = self.projection.iter().position(|x| x == original) {
490 new_location.insert(*original, position);
491 } else {
492 new_location.insert(*original, self.projection.len());
493 self.projection.push(*original);
494 }
495 }
496 // Permute references in extracted predicates to their new locations.
497 for predicate in temporal_predicates.iter_mut() {
498 predicate.permute_map(&new_location);
499 }
500
501 // Form a new `Self` containing the temporal predicates to return.
502 Self::new(self.projection.len())
503 .filter(temporal_predicates)
504 .project(0..old_projection_len)
505 }
506
507 /// Extracts common expressions from multiple `Self` into a result `Self`.
508 ///
509 /// The argument `mfps` are mutated so that each are functionaly equivalent to their
510 /// corresponding input, when composed atop the resulting `Self`.
511 ///
512 /// The `extract_exprs` argument is temporary, as we roll out the `extract_common_mfp_expressions` flag.
513 pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
514 match mfps.len() {
515 0 => {
516 panic!("Cannot call method on empty arguments");
517 }
518 1 => {
519 let output_arity = mfps[0].projection.len();
520 std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
521 }
522 _ => {
523 // More generally, we convert each mfp to ANF, at which point we can
524 // repeatedly extract atomic expressions that depend only on input
525 // columns, migrate them to an input mfp, and repeat until no such
526 // expressions exist. At this point, we can also migrate predicates
527 // and then determine and push down projections.
528
529 // Prepare a return `Self`.
530 let mut result_mfp = MapFilterProject::new(mfps[0].input_arity);
531
532 // We convert each mfp to ANF, using `memoize_expressions`.
533 for mfp in mfps.iter_mut() {
534 mfp.memoize_expressions();
535 }
536
537 // We repeatedly extract common expressions, until none remain.
538 let mut done = false;
539 while !done {
540 // We use references to determine common expressions, and must
541 // introduce a scope here to drop the borrows before mutation.
542 let common = {
543 // The input arity may increase as we iterate, so recapture.
544 let input_arity = result_mfp.projection.len();
545 let mut prev: BTreeSet<_> = mfps[0]
546 .expressions
547 .iter()
548 .filter(|e| e.support().iter().max() < Some(&input_arity))
549 .collect();
550 let mut next = BTreeSet::default();
551 for mfp in mfps[1..].iter() {
552 for expr in mfp.expressions.iter() {
553 if prev.contains(expr) {
554 next.insert(expr);
555 }
556 }
557 std::mem::swap(&mut prev, &mut next);
558 next.clear();
559 }
560 prev.into_iter().cloned().collect::<Vec<_>>()
561 };
562 // Without new common expressions, we should terminate the loop.
563 done = common.is_empty();
564
565 // Migrate each expression in `common` to `result_mfp`.
566 for expr in common.into_iter() {
567 // Update each mfp by removing expr and updating column references.
568 for mfp in mfps.iter_mut() {
569 // With `expr` next in `result_mfp`, it is as if we are rotating it to
570 // be the first expression in `mfp`, and then removing it from `mfp` and
571 // increasing the input arity of `mfp`.
572 let arity = result_mfp.projection.len();
573 let found = mfp.expressions.iter().position(|e| e == &expr).unwrap();
574 let index = arity + found;
575 // Column references change due to the rotation from `index` to `arity`.
576 let action = |c: &mut usize| {
577 if arity <= *c && *c < index {
578 *c += 1;
579 } else if *c == index {
580 *c = arity;
581 }
582 };
583 // Rotate `expr` from `found` to first, and then snip.
584 // Short circuit by simply removing and incrementing the input arity.
585 mfp.input_arity += 1;
586 mfp.expressions.remove(found);
587 // Update column references in expressions, predicates, and projections.
588 for e in mfp.expressions.iter_mut() {
589 e.visit_columns(action);
590 }
591 for (o, e) in mfp.predicates.iter_mut() {
592 e.visit_columns(action);
593 // Max out the offset for the predicate; optimization will correct.
594 *o = mfp.input_arity + mfp.expressions.len();
595 }
596 for c in mfp.projection.iter_mut() {
597 action(c);
598 }
599 }
600 // Install the expression and update
601 result_mfp.expressions.push(expr);
602 result_mfp.projection.push(result_mfp.projection.len());
603 }
604 }
605 // As before, but easier: predicates in common to all mfps.
606 let common_preds: Vec<MirScalarExpr> = {
607 let input_arity = result_mfp.projection.len();
608 let mut prev: BTreeSet<_> = mfps[0]
609 .predicates
610 .iter()
611 .map(|(_, e)| e)
612 .filter(|e| e.support().iter().max() < Some(&input_arity))
613 .collect();
614 let mut next = BTreeSet::default();
615 for mfp in mfps[1..].iter() {
616 for (_, expr) in mfp.predicates.iter() {
617 if prev.contains(expr) {
618 next.insert(expr);
619 }
620 }
621 std::mem::swap(&mut prev, &mut next);
622 next.clear();
623 }
624 // Expressions in common, that we will append to `result_mfp.expressions`.
625 prev.into_iter().cloned().collect::<Vec<_>>()
626 };
627 for mfp in mfps.iter_mut() {
628 mfp.predicates.retain(|(_, p)| !common_preds.contains(p));
629 mfp.optimize();
630 }
631 result_mfp.predicates.extend(
632 common_preds
633 .into_iter()
634 .map(|e| (result_mfp.projection.len(), e)),
635 );
636
637 // Then, look for unused columns and project them away.
638 let mut common_demand = BTreeSet::new();
639 for mfp in mfps.iter() {
640 common_demand.extend(mfp.demand());
641 }
642 // columns in `common_demand` must be retained, but others
643 // may be discarded.
644 let common_demand = (0..result_mfp.projection.len())
645 .filter(|x| common_demand.contains(x))
646 .collect::<Vec<_>>();
647 let remap = common_demand
648 .iter()
649 .cloned()
650 .enumerate()
651 .map(|(new, old)| (old, new))
652 .collect::<BTreeMap<_, _>>();
653 for mfp in mfps.iter_mut() {
654 mfp.permute_fn(|c| remap[&c], common_demand.len());
655 }
656 result_mfp = result_mfp.project(common_demand);
657
658 // Return the resulting MFP.
659 result_mfp.optimize();
660 result_mfp
661 }
662 }
663 }
664
665 /// Returns `self`, and leaves behind an identity operator that acts on its output.
666 pub fn take(&mut self) -> Self {
667 let mut identity = Self::new(self.projection.len());
668 std::mem::swap(self, &mut identity);
669 identity
670 }
671
672 /// Convert the `MapFilterProject` into a staged evaluation plan.
673 ///
674 /// The main behavior is extract temporal predicates, which cannot be evaluated
675 /// using the standard machinery.
676 pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
677 plan::MfpPlan::create_from(self)
678 }
679}
680
681impl MapFilterProject {
682 /// Partitions `self` into two instances, one of which can be eagerly applied.
683 ///
684 /// The `available` argument indicates which input columns are available (keys)
685 /// and in which positions (values). This information may allow some maps and
686 /// filters to execute. The `input_arity` argument reports the total number of
687 /// input columns (which may include some not present in `available`)
688 ///
689 /// This method partitions `self` in two parts, `(before, after)`, where `before`
690 /// can be applied on columns present as keys in `available`, and `after` must
691 /// await the introduction of the other input columns.
692 ///
693 /// The `before` instance will *append* any columns that can be determined from
694 /// `available` but will project away any of these columns that are not needed by
695 /// `after`. Importantly, this means that `before` will leave intact *all* input
696 /// columns including those not referenced in `available`.
697 ///
698 /// The `after` instance will presume all input columns are available, followed
699 /// by the appended columns of the `before` instance. It may be that some input
700 /// columns can be projected away in `before` if `after` does not need them, but
701 /// we leave that as something the caller can apply if needed (it is otherwise
702 /// complicated to negotiate which input columns `before` should retain).
703 ///
704 /// To correctly reconstruct `self` from `before` and `after`, one must introduce
705 /// additional input columns, permute all input columns to their locations as
706 /// expected by `self`, follow this by new columns appended by `before`, and
707 /// remove all other columns that may be present.
708 ///
709 /// # Example
710 ///
711 /// ```rust
712 /// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr};
713 ///
714 /// // imagine an action on columns (a, b, c, d).
715 /// let original = MapFilterProject::new(4).map(vec![
716 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::AddInt64),
717 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64),
718 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
719 /// ]).project(vec![6]);
720 ///
721 /// // Imagine we start with columns (b, x, a, y, c).
722 /// //
723 /// // The `partition` method requires a map from *expected* input columns to *actual*
724 /// // input columns. In the example above, the columns a, b, and c exist, and are at
725 /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
726 /// let mut available_columns = std::collections::BTreeMap::new();
727 /// available_columns.insert(0, 2);
728 /// available_columns.insert(1, 0);
729 /// available_columns.insert(2, 4);
730 /// // Partition `original` using the available columns and current input arity.
731 /// // This informs `partition` which columns are available, where they can be found,
732 /// // and how many columns are not relevant but should be preserved.
733 /// let (before, after) = original.partition(available_columns, 5);
734 ///
735 /// // `before` sees all five input columns, and should append `a + b + c`.
736 /// assert_eq!(before, MapFilterProject::new(5).map(vec![
737 /// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), BinaryFunc::AddInt64),
738 /// MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
739 /// ]).project(vec![0, 1, 2, 3, 4, 6]));
740 ///
741 /// // `after` expects to see `(a, b, c, d, a + b + c)`.
742 /// assert_eq!(after, MapFilterProject::new(5).map(vec![
743 /// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64)
744 /// ]).project(vec![5]));
745 ///
746 /// // To reconstruct `self`, we must introduce the columns that are not present,
747 /// // and present them in the order intended by `self`. In this example, we must
748 /// // introduce column d and permute the columns so that they begin (a, b, c, d).
749 /// // The columns x and y must be projected away, and any columns introduced by
750 /// // `begin` must be retained in their current order.
751 ///
752 /// // The `after` instance expects to be provided with all inputs, but it
753 /// // may not need all inputs. The `demand()` and `permute()` methods can
754 /// // optimize the representation.
755 /// ```
756 pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
757 // Map expressions, filter predicates, and projections for `before` and `after`.
758 let mut before_expr = Vec::new();
759 let mut before_pred = Vec::new();
760 let mut before_proj = Vec::new();
761 let mut after_expr = Vec::new();
762 let mut after_pred = Vec::new();
763 let mut after_proj = Vec::new();
764
765 // Track which output columns must be preserved in the output of `before`.
766 let mut demanded = BTreeSet::new();
767 demanded.extend(0..self.input_arity);
768 demanded.extend(self.projection.iter());
769
770 // Determine which map expressions can be computed from the available subset.
771 // Some expressions may depend on other expressions, but by evaluating them
772 // in forward order we should accurately determine the available expressions.
773 let mut available_expr = vec![false; self.input_arity];
774 // Initialize available columns from `available`, which is then not used again.
775 for index in available.keys() {
776 available_expr[*index] = true;
777 }
778 for expr in self.expressions.into_iter() {
779 // We treat an expression as available if its supporting columns are available,
780 // and if it is not a literal (we want to avoid pushing down literals). This
781 // choice is ad-hoc, but the intent is that we partition the operators so
782 // that we can reduce the row representation size and total computation.
783 // Pushing down literals harms the former and does nothing for the latter.
784 // In the future, we'll want to have a harder think about this trade-off, as
785 // we are certainly making sub-optimal decisions by pushing down all available
786 // work.
787 // TODO(mcsherry): establish better principles about what work to push down.
788 let is_available =
789 expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
790 if is_available {
791 before_expr.push(expr);
792 } else {
793 demanded.extend(expr.support());
794 after_expr.push(expr);
795 }
796 available_expr.push(is_available);
797 }
798
799 // Determine which predicates can be computed from the available subset.
800 for (_when, pred) in self.predicates.into_iter() {
801 let is_available = pred.support().into_iter().all(|i| available_expr[i]);
802 if is_available {
803 before_pred.push(pred);
804 } else {
805 demanded.extend(pred.support());
806 after_pred.push(pred);
807 }
808 }
809
810 // Map from prior output location to location in un-projected `before`.
811 // This map is used to correct references in `before` but it should be
812 // adjusted to reflect `before`s projection prior to use in `after`.
813 let mut before_map = available;
814 // Input columns include any additional undescribed columns that may
815 // not be captured by the `available` argument, so we must independently
816 // track the current number of columns (vs relying on `before_map.len()`).
817 let mut input_columns = input_arity;
818 for index in self.input_arity..available_expr.len() {
819 if available_expr[index] {
820 before_map.insert(index, input_columns);
821 input_columns += 1;
822 }
823 }
824
825 // Permute the column references in `before` expressions and predicates.
826 for expr in before_expr.iter_mut() {
827 expr.permute_map(&before_map);
828 }
829 for pred in before_pred.iter_mut() {
830 pred.permute_map(&before_map);
831 }
832
833 // Demand information determines `before`s output projection.
834 // Specifically, we produce all input columns in the output, as well as
835 // any columns that are available and demanded.
836 before_proj.extend(0..input_arity);
837 for index in self.input_arity..available_expr.len() {
838 // If an intermediate result is both available and demanded,
839 // we should produce it as output.
840 if available_expr[index] && demanded.contains(&index) {
841 // Use the new location of `index`.
842 before_proj.push(before_map[&index]);
843 }
844 }
845
846 // Map from prior output locations to location in post-`before` columns.
847 // This map is used to correct references in `after`.
848 // The presumption is that `after` will be presented with all input columns,
849 // followed by the output columns introduced by `before` in order.
850 let mut after_map = BTreeMap::new();
851 for index in 0..self.input_arity {
852 after_map.insert(index, index);
853 }
854 for index in self.input_arity..available_expr.len() {
855 // If an intermediate result is both available and demanded,
856 // it was produced as output.
857 if available_expr[index] && demanded.contains(&index) {
858 // We expect to find the output as far after `self.input_arity` as
859 // it was produced after `input_arity` in the output of `before`.
860 let location = self.input_arity
861 + (before_proj
862 .iter()
863 .position(|x| x == &before_map[&index])
864 .unwrap()
865 - input_arity);
866 after_map.insert(index, location);
867 }
868 }
869 // We must now re-map the remaining non-demanded expressions, which are
870 // contiguous rather than potentially interspersed.
871 for index in self.input_arity..available_expr.len() {
872 if !available_expr[index] {
873 after_map.insert(index, after_map.len());
874 }
875 }
876
877 // Permute the column references in `after` expressions and predicates.
878 for expr in after_expr.iter_mut() {
879 expr.permute_map(&after_map);
880 }
881 for pred in after_pred.iter_mut() {
882 pred.permute_map(&after_map);
883 }
884 // Populate `after` projection with the new locations of `self.projection`.
885 for index in self.projection {
886 after_proj.push(after_map[&index]);
887 }
888
889 // Form and return the before and after MapFilterProject instances.
890 let before = Self::new(input_arity)
891 .map(before_expr)
892 .filter(before_pred)
893 .project(before_proj.clone());
894 let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
895 .map(after_expr)
896 .filter(after_pred)
897 .project(after_proj);
898 (before, after)
899 }
900
901 /// Lists input columns whose values are used in outputs.
902 ///
903 /// It is entirely appropriate to determine the demand of an instance
904 /// and then both apply a projection to the subject of the instance and
905 /// `self.permute` this instance.
906 pub fn demand(&self) -> BTreeSet<usize> {
907 let mut demanded = BTreeSet::new();
908 for (_index, pred) in self.predicates.iter() {
909 demanded.extend(pred.support());
910 }
911 demanded.extend(self.projection.iter().cloned());
912 for index in (0..self.expressions.len()).rev() {
913 if demanded.contains(&(self.input_arity + index)) {
914 demanded.extend(self.expressions[index].support());
915 }
916 }
917 demanded.retain(|col| col < &self.input_arity);
918 demanded
919 }
920
921 /// Update input column references, due to an input projection or permutation.
922 ///
923 /// The `shuffle` argument remaps expected column identifiers to new locations,
924 /// with the expectation that `shuffle` describes all input columns, and so the
925 /// intermediate results will be able to start at position `shuffle.len()`.
926 ///
927 /// The supplied `shuffle` might not list columns that are not "demanded" by the
928 /// instance, and so we should ensure that `self` is optimized to not reference
929 /// columns that are not demanded.
930 pub fn permute_fn<F>(&mut self, remap: F, new_input_arity: usize)
931 where
932 F: Fn(usize) -> usize,
933 {
934 let (mut map, mut filter, mut project) = self.as_map_filter_project();
935 let map_len = map.len();
936 let action = |col: &mut usize| {
937 if self.input_arity <= *col && *col < self.input_arity + map_len {
938 *col = new_input_arity + (*col - self.input_arity);
939 } else {
940 *col = remap(*col);
941 }
942 };
943 for expr in map.iter_mut() {
944 expr.visit_columns(action);
945 }
946 for pred in filter.iter_mut() {
947 pred.visit_columns(action);
948 }
949 for proj in project.iter_mut() {
950 action(proj);
951 assert!(*proj < new_input_arity + map.len());
952 }
953 *self = Self::new(new_input_arity)
954 .map(map)
955 .filter(filter)
956 .project(project)
957 }
958}
959
960// Optimization routines.
961impl MapFilterProject {
962 /// Optimize the internal expression evaluation order.
963 ///
964 /// This method performs several optimizations that are meant to streamline
965 /// the execution of the `MapFilterProject` instance, but not to alter its
966 /// semantics. This includes extracting expressions that are used multiple
967 /// times, inlining those that are not, and removing expressions that are
968 /// unreferenced.
969 ///
970 /// This method will inline all temporal expressions, and remove any columns
971 /// that are not demanded by the output, which should transform any temporal
972 /// filters to a state where the temporal expressions exist only in the list
973 /// of predicates.
974 ///
975 /// # Example
976 ///
977 /// This example demonstrates how the re-use of one expression, converting
978 /// column 1 from a string to an integer, can be extracted and the results
979 /// shared among the two uses. This example is used for each of the steps
980 /// along the optimization path.
981 ///
982 /// ```rust
983 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
984 /// // Demonstrate extraction of common expressions (here: parsing strings).
985 /// let mut map_filter_project = MapFilterProject::new(5)
986 /// .map(vec![
987 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
988 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
989 /// ])
990 /// .project(vec![3,4,5,6]);
991 ///
992 /// let mut expected_optimized = MapFilterProject::new(5)
993 /// .map(vec![
994 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
995 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
996 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
997 /// ])
998 /// .project(vec![3,4,6,7]);
999 ///
1000 /// // Optimize the expression.
1001 /// map_filter_project.optimize();
1002 ///
1003 /// assert_eq!(
1004 /// map_filter_project,
1005 /// expected_optimized,
1006 /// );
1007 /// ```
1008 pub fn optimize(&mut self) {
1009 // Track sizes and iterate as long as they decrease.
1010 let mut prev_size = None;
1011 let mut self_size = usize::max_value();
1012 // Continue as long as strict improvements occur.
1013 while prev_size.map(|p| self_size < p).unwrap_or(true) {
1014 // Lock in current size.
1015 prev_size = Some(self_size);
1016
1017 // We have an annoying pattern of mapping literals that already exist as columns (by filters).
1018 // Try to identify this pattern, of a map that introduces an expression equated to a prior column,
1019 // and then replace the mapped expression by a column reference.
1020 //
1021 // We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
1022 // the first place. The tell-tale that we see when we fix is a diff that look likes
1023 //
1024 // - Project (#0, #2)
1025 // - Filter (#1 = 1)
1026 // - Map (1)
1027 // - Get l0
1028 // + Filter (#1 = 1)
1029 // + Get l0
1030 //
1031 for (index, expr) in self.expressions.iter_mut().enumerate() {
1032 // If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
1033 for (_, predicate) in self.predicates.iter() {
1034 if let MirScalarExpr::CallBinary {
1035 func: crate::BinaryFunc::Eq,
1036 expr1,
1037 expr2,
1038 } = predicate
1039 {
1040 if let MirScalarExpr::Column(c, name) = &**expr1 {
1041 if *c < index + self.input_arity && &**expr2 == expr {
1042 *expr = MirScalarExpr::Column(*c, name.clone());
1043 }
1044 }
1045 if let MirScalarExpr::Column(c, name) = &**expr2 {
1046 if *c < index + self.input_arity && &**expr1 == expr {
1047 *expr = MirScalarExpr::Column(*c, name.clone());
1048 }
1049 }
1050 }
1051 }
1052 }
1053
1054 // Optimization memoizes individual `ScalarExpr` expressions that
1055 // are sure to be evaluated, canonicalizes references to the first
1056 // occurrence of each, inlines expressions that have a reference
1057 // count of one, and then removes any expressions that are not
1058 // referenced.
1059 self.memoize_expressions();
1060 self.predicates.sort();
1061 self.predicates.dedup();
1062 self.inline_expressions();
1063 self.remove_undemanded();
1064
1065 // Re-build `self` from parts to restore evaluation order invariants.
1066 let (map, filter, project) = self.as_map_filter_project();
1067 *self = Self::new(self.input_arity)
1068 .map(map)
1069 .filter(filter)
1070 .project(project);
1071
1072 self_size = self.size();
1073 }
1074 }
1075
1076 /// Total expression sizes across all expressions.
1077 pub fn size(&self) -> usize {
1078 self.expressions.iter().map(|e| e.size()).sum::<usize>()
1079 + self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
1080 }
1081
1082 /// Place each certainly evaluated expression in its own column.
1083 ///
1084 /// This method places each non-trivial, certainly evaluated expression
1085 /// in its own column, and deduplicates them so that all references to
1086 /// the same expression reference the same column.
1087 ///
1088 /// This transformation is restricted to expressions we are certain will
1089 /// be evaluated, which does not include expressions in `if` statements.
1090 ///
1091 /// # Example
1092 ///
1093 /// This example demonstrates how memoization notices `MirScalarExpr`s
1094 /// that are used multiple times, and ensures that each are extracted
1095 /// into columns and then referenced by column. This pass does not try
1096 /// to minimize the occurrences of column references, which will happen
1097 /// in inlining.
1098 ///
1099 /// ```rust
1100 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1101 /// // Demonstrate extraction of common expressions (here: parsing strings).
1102 /// let mut map_filter_project = MapFilterProject::new(5)
1103 /// .map(vec![
1104 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1105 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1106 /// ])
1107 /// .project(vec![3,4,5,6]);
1108 ///
1109 /// let mut expected_optimized = MapFilterProject::new(5)
1110 /// .map(vec![
1111 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1112 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1113 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1114 /// MirScalarExpr::column(7),
1115 /// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1116 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
1117 /// MirScalarExpr::column(10),
1118 /// ])
1119 /// .project(vec![3,4,8,11]);
1120 ///
1121 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1122 /// map_filter_project.memoize_expressions();
1123 ///
1124 /// assert_eq!(
1125 /// map_filter_project,
1126 /// expected_optimized,
1127 /// );
1128 /// ```
1129 ///
1130 /// Expressions may not be memoized if they are not certain to be evaluated,
1131 /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
1132 ///
1133 /// ```rust
1134 /// use mz_expr::{MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1135 /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
1136 /// // the non-extraction of common expressions guarded by conditions.
1137 /// let mut map_filter_project = MapFilterProject::new(2)
1138 /// .map(vec![
1139 /// MirScalarExpr::If {
1140 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
1141 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1142 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1143 /// },
1144 /// MirScalarExpr::If {
1145 /// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
1146 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1147 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1148 /// },
1149 /// ]);
1150 ///
1151 /// let mut expected_optimized = MapFilterProject::new(2)
1152 /// .map(vec![
1153 /// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt),
1154 /// MirScalarExpr::If {
1155 /// cond: Box::new(MirScalarExpr::column(2)),
1156 /// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1157 /// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1158 /// },
1159 /// MirScalarExpr::column(3),
1160 /// MirScalarExpr::If {
1161 /// cond: Box::new(MirScalarExpr::column(2)),
1162 /// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
1163 /// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
1164 /// },
1165 /// MirScalarExpr::column(5),
1166 /// ])
1167 /// .project(vec![0,1,4,6]);
1168 ///
1169 /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
1170 /// map_filter_project.memoize_expressions();
1171 ///
1172 /// assert_eq!(
1173 /// map_filter_project,
1174 /// expected_optimized,
1175 /// );
1176 /// ```
1177 pub fn memoize_expressions(&mut self) {
1178 // Record the mapping from starting column references to new column
1179 // references.
1180 let mut remaps = BTreeMap::new();
1181 for index in 0..self.input_arity {
1182 remaps.insert(index, index);
1183 }
1184 let mut new_expressions = Vec::new();
1185
1186 // We follow the same order as for evaluation, to ensure that all
1187 // column references exist in time for their evaluation. We could
1188 // prioritize predicates, but we would need to be careful to chase
1189 // down column references to expressions and memoize those as well.
1190 let mut expression = 0;
1191 for (support, predicate) in self.predicates.iter_mut() {
1192 while self.input_arity + expression < *support {
1193 self.expressions[expression].permute_map(&remaps);
1194 memoize_expr(
1195 &mut self.expressions[expression],
1196 &mut new_expressions,
1197 self.input_arity,
1198 );
1199 remaps.insert(
1200 self.input_arity + expression,
1201 self.input_arity + new_expressions.len(),
1202 );
1203 new_expressions.push(self.expressions[expression].clone());
1204 expression += 1;
1205 }
1206 predicate.permute_map(&remaps);
1207 memoize_expr(predicate, &mut new_expressions, self.input_arity);
1208 }
1209 while expression < self.expressions.len() {
1210 self.expressions[expression].permute_map(&remaps);
1211 memoize_expr(
1212 &mut self.expressions[expression],
1213 &mut new_expressions,
1214 self.input_arity,
1215 );
1216 remaps.insert(
1217 self.input_arity + expression,
1218 self.input_arity + new_expressions.len(),
1219 );
1220 new_expressions.push(self.expressions[expression].clone());
1221 expression += 1;
1222 }
1223
1224 self.expressions = new_expressions;
1225 for proj in self.projection.iter_mut() {
1226 *proj = remaps[proj];
1227 }
1228
1229 // Restore predicate order invariants.
1230 for (pos, pred) in self.predicates.iter_mut() {
1231 *pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
1232 }
1233 }
1234
1235 /// This method inlines expressions with a single use.
1236 ///
1237 /// This method only inlines expressions; it does not delete expressions
1238 /// that are no longer referenced. The `remove_undemanded()` method does
1239 /// that, and should likely be used after this method.
1240 ///
1241 /// Inlining replaces column references when the referred-to item is either
1242 /// another column reference, or the only referrer of its referent. This
1243 /// is most common after memoization has atomized all expressions to seek
1244 /// out re-use: inlining re-assembles expressions that were not helpfully
1245 /// shared with other expressions.
1246 ///
1247 /// # Example
1248 ///
1249 /// In this example, we see that with only a single reference to columns
1250 /// 0 and 2, their parsing can each be inlined. Similarly, column references
1251 /// can be cleaned up among expressions, and in the final projection.
1252 ///
1253 /// Also notice the remaining expressions, which can be cleaned up in a later
1254 /// pass (the `remove_undemanded` method).
1255 ///
1256 /// ```rust
1257 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1258 /// // Use the output from first `memoize_expression` example.
1259 /// let mut map_filter_project = MapFilterProject::new(5)
1260 /// .map(vec![
1261 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1262 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1263 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1264 /// MirScalarExpr::column(7),
1265 /// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1266 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
1267 /// MirScalarExpr::column(10),
1268 /// ])
1269 /// .project(vec![3,4,8,11]);
1270 ///
1271 /// let mut expected_optimized = MapFilterProject::new(5)
1272 /// .map(vec![
1273 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1274 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1275 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1276 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1277 /// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1278 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1279 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1280 /// ])
1281 /// .project(vec![3,4,8,11]);
1282 ///
1283 /// // Inline expressions that are referenced only once.
1284 /// map_filter_project.inline_expressions();
1285 ///
1286 /// assert_eq!(
1287 /// map_filter_project,
1288 /// expected_optimized,
1289 /// );
1290 /// ```
1291 pub fn inline_expressions(&mut self) {
1292 // Local copy of input_arity to avoid borrowing `self` in closures.
1293 let input_arity = self.input_arity;
1294 // Reference counts track the number of places that a reference occurs.
1295 let mut reference_count = vec![0; input_arity + self.expressions.len()];
1296 // Increment reference counts for each use
1297 for expr in self.expressions.iter() {
1298 expr.visit_pre(|e| {
1299 if let MirScalarExpr::Column(i, _name) = e {
1300 reference_count[*i] += 1;
1301 }
1302 });
1303 }
1304 for (_, pred) in self.predicates.iter() {
1305 pred.visit_pre(|e| {
1306 if let MirScalarExpr::Column(i, _name) = e {
1307 reference_count[*i] += 1;
1308 }
1309 });
1310 }
1311 for proj in self.projection.iter() {
1312 reference_count[*proj] += 1;
1313 }
1314
1315 // Determine which expressions should be inlined because they reference temporal expressions.
1316 let mut is_temporal = vec![false; input_arity];
1317 for expr in self.expressions.iter() {
1318 // An express may contain a temporal expression, or reference a column containing such.
1319 is_temporal.push(
1320 expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
1321 );
1322 }
1323
1324 // Inline only those columns that 1. are expressions not inputs, and
1325 // 2a. are column references or literals or 2b. have a refcount of 1,
1326 // or 2c. reference temporal expressions (which cannot be evaluated).
1327 let mut should_inline = vec![false; reference_count.len()];
1328 for i in (input_arity..reference_count.len()).rev() {
1329 if let MirScalarExpr::Column(c, _) = self.expressions[i - input_arity] {
1330 should_inline[i] = true;
1331 // The reference count of the referenced column should be
1332 // incremented with the number of references
1333 // `self.expressions[i - input_arity]` has.
1334 // Subtract 1 because `self.expressions[i - input_arity]` is
1335 // itself a reference.
1336 reference_count[c] += reference_count[i] - 1;
1337 } else {
1338 should_inline[i] = reference_count[i] == 1 || is_temporal[i];
1339 }
1340 }
1341 // Inline expressions per `should_inline`.
1342 self.perform_inlining(should_inline);
1343 // We can only inline column references in `self.projection`, but we should.
1344 for proj in self.projection.iter_mut() {
1345 if *proj >= self.input_arity {
1346 if let MirScalarExpr::Column(i, _) = self.expressions[*proj - self.input_arity] {
1347 // TODO(mgree) !!! propagate name information to projection
1348 *proj = i;
1349 }
1350 }
1351 }
1352 }
1353
1354 /// Inlines those expressions that are indicated by should_inline.
1355 /// See `inline_expressions` for usage.
1356 pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
1357 for index in 0..self.expressions.len() {
1358 let (prior, expr) = self.expressions.split_at_mut(index);
1359 #[allow(deprecated)]
1360 expr[0].visit_mut_post_nolimit(&mut |e| {
1361 if let MirScalarExpr::Column(i, _name) = e {
1362 if should_inline[*i] {
1363 *e = prior[*i - self.input_arity].clone();
1364 }
1365 }
1366 });
1367 }
1368 for (_index, pred) in self.predicates.iter_mut() {
1369 let expressions = &self.expressions;
1370 #[allow(deprecated)]
1371 pred.visit_mut_post_nolimit(&mut |e| {
1372 if let MirScalarExpr::Column(i, _name) = e {
1373 if should_inline[*i] {
1374 *e = expressions[*i - self.input_arity].clone();
1375 }
1376 }
1377 });
1378 }
1379 }
1380
1381 /// Removes unused expressions from `self.expressions`.
1382 ///
1383 /// Expressions are "used" if they are relied upon by any output columns
1384 /// or any predicates, even transitively. Any expressions that are not
1385 /// relied upon in this way can be discarded.
1386 ///
1387 /// # Example
1388 ///
1389 /// ```rust
1390 /// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
1391 /// // Use the output from `inline_expression` example.
1392 /// let mut map_filter_project = MapFilterProject::new(5)
1393 /// .map(vec![
1394 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1395 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1396 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1397 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
1398 /// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1399 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1400 /// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1401 /// ])
1402 /// .project(vec![3,4,8,11]);
1403 ///
1404 /// let mut expected_optimized = MapFilterProject::new(5)
1405 /// .map(vec![
1406 /// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
1407 /// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
1408 /// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
1409 /// ])
1410 /// .project(vec![3,4,6,7]);
1411 ///
1412 /// // Remove undemanded expressions, streamlining the work done..
1413 /// map_filter_project.remove_undemanded();
1414 ///
1415 /// assert_eq!(
1416 /// map_filter_project,
1417 /// expected_optimized,
1418 /// );
1419 /// ```
1420 pub fn remove_undemanded(&mut self) {
1421 // Determine the demanded expressions to remove irrelevant ones.
1422 let mut demand = BTreeSet::new();
1423 for (_index, pred) in self.predicates.iter() {
1424 demand.extend(pred.support());
1425 }
1426 // Start from the output columns as presumed demanded.
1427 // If this is not the case, the caller should project some away.
1428 demand.extend(self.projection.iter().cloned());
1429 // Proceed in *reverse* order, as expressions may depend on other
1430 // expressions that precede them.
1431 for index in (0..self.expressions.len()).rev() {
1432 if demand.contains(&(self.input_arity + index)) {
1433 demand.extend(self.expressions[index].support());
1434 }
1435 }
1436
1437 // Maintain a map from initial column identifiers to locations
1438 // once we have removed undemanded expressions.
1439 let mut remap = BTreeMap::new();
1440 // This map only needs to map elements of `demand` to a new location,
1441 // but the logic is easier if we include all input columns (as the
1442 // new position is then determined by the size of the map).
1443 for index in 0..self.input_arity {
1444 remap.insert(index, index);
1445 }
1446 // Retain demanded expressions, and record their new locations.
1447 let mut new_expressions = Vec::new();
1448 for (index, expr) in self.expressions.drain(..).enumerate() {
1449 if demand.contains(&(index + self.input_arity)) {
1450 remap.insert(index + self.input_arity, remap.len());
1451 new_expressions.push(expr);
1452 }
1453 }
1454 self.expressions = new_expressions;
1455
1456 // Update column identifiers; rebuild `Self` to re-establish any invariants.
1457 // We mirror `self.permute(&remap)` but we specifically want to remap columns
1458 // that are produced by `self.expressions` after the input columns.
1459 let (expressions, predicates, projection) = self.as_map_filter_project();
1460 *self = Self::new(self.input_arity)
1461 .map(expressions.into_iter().map(|mut e| {
1462 e.permute_map(&remap);
1463 e
1464 }))
1465 .filter(predicates.into_iter().map(|mut p| {
1466 p.permute_map(&remap);
1467 p
1468 }))
1469 .project(projection.into_iter().map(|c| remap[&c]));
1470 }
1471}
1472
1473// TODO: move this elsewhere?
1474/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
1475///
1476/// A part of `expr` that is memoized is replaced by a reference to column
1477/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
1478/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
1479/// refers to.
1480pub fn memoize_expr(
1481 expr: &mut MirScalarExpr,
1482 memoized_parts: &mut Vec<MirScalarExpr>,
1483 input_arity: usize,
1484) {
1485 #[allow(deprecated)]
1486 expr.visit_mut_pre_post_nolimit(
1487 &mut |e| {
1488 // We should not eagerly memoize `if` branches that might not be taken.
1489 // TODO: Memoize expressions in the intersection of `then` and `els`.
1490 if let MirScalarExpr::If { cond, .. } = e {
1491 return Some(vec![cond]);
1492 }
1493 None
1494 },
1495 &mut |e| {
1496 match e {
1497 MirScalarExpr::Literal(_, _) => {
1498 // Literals do not need to be memoized.
1499 }
1500 MirScalarExpr::Column(col, _) => {
1501 // Column references do not need to be memoized, but may need to be
1502 // updated if they reference a column reference themselves.
1503 if *col > input_arity {
1504 if let MirScalarExpr::Column(col2, _) = memoized_parts[*col - input_arity] {
1505 // We do _not_ propagate column names, since mis-associationg names and column
1506 // references will be very confusing (and possibly bug inducing).
1507 *col = col2;
1508 }
1509 }
1510 }
1511 _ => {
1512 if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
1513 // Any complex expression that already exists as a prior column can
1514 // be replaced by a reference to that column.
1515 *e = MirScalarExpr::column(input_arity + position);
1516 } else {
1517 // A complex expression that does not exist should be memoized, and
1518 // replaced by a reference to the column.
1519 memoized_parts.push(std::mem::replace(
1520 e,
1521 MirScalarExpr::column(input_arity + memoized_parts.len()),
1522 ));
1523 }
1524 }
1525 }
1526 },
1527 )
1528}
1529
1530pub mod util {
1531 use std::collections::BTreeMap;
1532
1533 use crate::MirScalarExpr;
1534
1535 #[allow(dead_code)]
1536 /// A triple of actions that map from rows to (key, val) pairs and back again.
1537 struct KeyValRowMapping {
1538 /// Expressions to apply to a row to produce key datums.
1539 to_key: Vec<MirScalarExpr>,
1540 /// Columns to project from a row to produce residual value datums.
1541 to_val: Vec<usize>,
1542 /// Columns to project from the concatenation of key and value to reconstruct the row.
1543 to_row: Vec<usize>,
1544 }
1545
1546 /// Derive supporting logic to support transforming rows to (key, val) pairs,
1547 /// and back again.
1548 ///
1549 /// We are given as input a list of key expressions and an input arity, and the
1550 /// requirement the produced key should be the application of the key expressions.
1551 /// To produce the `val` output, we will identify those input columns not found in
1552 /// the key expressions, and name all other columns.
1553 /// To reconstitute the original row, we identify the sequence of columns from the
1554 /// concatenation of key and val which would reconstruct the original row.
1555 ///
1556 /// The output is a pair of column sequences, the first used to reconstruct a row
1557 /// from the concatenation of key and value, and the second to identify the columns
1558 /// of a row that should become the value associated with its key.
1559 ///
1560 /// The permutations and thinning expressions generated here will be tracked in
1561 /// `dataflow::plan::AvailableCollections`; see the
1562 /// documentation there for more details.
1563 pub fn permutation_for_arrangement(
1564 key: &[MirScalarExpr],
1565 unthinned_arity: usize,
1566 ) -> (Vec<usize>, Vec<usize>) {
1567 let columns_in_key: BTreeMap<_, _> = key
1568 .iter()
1569 .enumerate()
1570 .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
1571 .collect();
1572 let mut input_cursor = key.len();
1573 let permutation = (0..unthinned_arity)
1574 .map(|c| {
1575 if let Some(c) = columns_in_key.get(&c) {
1576 // Column is in key (and thus gone from the value
1577 // of the thinned representation)
1578 *c
1579 } else {
1580 // Column remains in value of the thinned representation
1581 input_cursor += 1;
1582 input_cursor - 1
1583 }
1584 })
1585 .collect();
1586 let thinning = (0..unthinned_arity)
1587 .filter(|c| !columns_in_key.contains_key(c))
1588 .collect();
1589 (permutation, thinning)
1590 }
1591
1592 /// Given the permutations (see [`permutation_for_arrangement`] and
1593 /// (`dataflow::plan::AvailableCollections`) corresponding to two
1594 /// collections with the same key arity,
1595 /// computes the permutation for the result of joining them.
1596 pub fn join_permutations(
1597 key_arity: usize,
1598 stream_permutation: Vec<usize>,
1599 thinned_stream_arity: usize,
1600 lookup_permutation: Vec<usize>,
1601 ) -> BTreeMap<usize, usize> {
1602 let stream_arity = stream_permutation.len();
1603 let lookup_arity = lookup_permutation.len();
1604
1605 (0..stream_arity + lookup_arity)
1606 .map(|i| {
1607 let location = if i < stream_arity {
1608 stream_permutation[i]
1609 } else {
1610 let location_in_lookup = lookup_permutation[i - stream_arity];
1611 if location_in_lookup < key_arity {
1612 location_in_lookup
1613 } else {
1614 location_in_lookup + thinned_stream_arity
1615 }
1616 };
1617 (i, location)
1618 })
1619 .collect()
1620 }
1621}
1622
1623pub mod plan {
1624 use std::iter;
1625
1626 use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
1627 use mz_repr::{Datum, Diff, Row, RowArena};
1628 use proptest::prelude::*;
1629 use proptest_derive::Arbitrary;
1630 use serde::{Deserialize, Serialize};
1631
1632 use crate::{
1633 BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, ProtoMfpPlan, ProtoSafeMfpPlan,
1634 UnaryFunc, UnmaterializableFunc, func,
1635 };
1636
1637 /// A wrapper type which indicates it is safe to simply evaluate all expressions.
1638 #[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1639 pub struct SafeMfpPlan {
1640 pub(crate) mfp: MapFilterProject,
1641 }
1642
1643 impl RustType<ProtoSafeMfpPlan> for SafeMfpPlan {
1644 fn into_proto(&self) -> ProtoSafeMfpPlan {
1645 ProtoSafeMfpPlan {
1646 mfp: Some(self.mfp.into_proto()),
1647 }
1648 }
1649
1650 fn from_proto(proto: ProtoSafeMfpPlan) -> Result<Self, TryFromProtoError> {
1651 Ok(SafeMfpPlan {
1652 mfp: proto.mfp.into_rust_if_some("ProtoSafeMfpPlan::mfp")?,
1653 })
1654 }
1655 }
1656
1657 impl SafeMfpPlan {
1658 /// Remaps references to input columns according to `remap`.
1659 ///
1660 /// Leaves other column references, e.g. to newly mapped columns, unchanged.
1661 pub fn permute_fn<F>(&mut self, remap: F, new_arity: usize)
1662 where
1663 F: Fn(usize) -> usize,
1664 {
1665 self.mfp.permute_fn(remap, new_arity);
1666 }
1667 /// Evaluates the linear operator on a supplied list of datums.
1668 ///
1669 /// The arguments are the initial datums associated with the row,
1670 /// and an appropriately lifetimed arena for temporary allocations
1671 /// needed by scalar evaluation.
1672 ///
1673 /// An `Ok` result will either be `None` if any predicate did not
1674 /// evaluate to `Datum::True`, or the values of the columns listed
1675 /// by `self.projection` if all predicates passed. If an error
1676 /// occurs in the evaluation it is returned as an `Err` variant.
1677 /// As the evaluation exits early with failed predicates, it may
1678 /// miss some errors that would occur later in evaluation.
1679 ///
1680 /// The `row` is not cleared first, but emptied if the function
1681 /// returns `Ok(Some(row)).
1682 #[inline(always)]
1683 pub fn evaluate_into<'a, 'row>(
1684 &'a self,
1685 datums: &mut Vec<Datum<'a>>,
1686 arena: &'a RowArena,
1687 row_buf: &'row mut Row,
1688 ) -> Result<Option<&'row Row>, EvalError> {
1689 let passed_predicates = self.evaluate_inner(datums, arena)?;
1690 if !passed_predicates {
1691 Ok(None)
1692 } else {
1693 row_buf
1694 .packer()
1695 .extend(self.mfp.projection.iter().map(|c| datums[*c]));
1696 Ok(Some(row_buf))
1697 }
1698 }
1699
1700 /// A version of `evaluate` which produces an iterator over `Datum`
1701 /// as output.
1702 ///
1703 /// This version can be useful when one wants to capture the resulting
1704 /// datums without packing and then unpacking a row.
1705 #[inline(always)]
1706 pub fn evaluate_iter<'b, 'a: 'b>(
1707 &'a self,
1708 datums: &'b mut Vec<Datum<'a>>,
1709 arena: &'a RowArena,
1710 ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
1711 let passed_predicates = self.evaluate_inner(datums, arena)?;
1712 if !passed_predicates {
1713 Ok(None)
1714 } else {
1715 Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
1716 }
1717 }
1718
1719 /// Populates `datums` with `self.expressions` and tests `self.predicates`.
1720 ///
1721 /// This does not apply `self.projection`, which is up to the calling method.
1722 pub fn evaluate_inner<'b, 'a: 'b>(
1723 &'a self,
1724 datums: &'b mut Vec<Datum<'a>>,
1725 arena: &'a RowArena,
1726 ) -> Result<bool, EvalError> {
1727 let mut expression = 0;
1728 for (support, predicate) in self.mfp.predicates.iter() {
1729 while self.mfp.input_arity + expression < *support {
1730 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1731 expression += 1;
1732 }
1733 if predicate.eval(&datums[..], arena)? != Datum::True {
1734 return Ok(false);
1735 }
1736 }
1737 while expression < self.mfp.expressions.len() {
1738 datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
1739 expression += 1;
1740 }
1741 Ok(true)
1742 }
1743
1744 /// Returns true if evaluation could introduce an error on non-error inputs.
1745 pub fn could_error(&self) -> bool {
1746 self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
1747 || self.mfp.expressions.iter().any(|e| e.could_error())
1748 }
1749
1750 /// Returns true when `Self` is the identity.
1751 pub fn is_identity(&self) -> bool {
1752 self.mfp.is_identity()
1753 }
1754 }
1755
1756 impl std::ops::Deref for SafeMfpPlan {
1757 type Target = MapFilterProject;
1758 fn deref(&self) -> &Self::Target {
1759 &self.mfp
1760 }
1761 }
1762
1763 /// Predicates partitioned into temporal and non-temporal.
1764 ///
1765 /// Temporal predicates require some recognition to determine their
1766 /// structure, and it is best to do that once and re-use the results.
1767 ///
1768 /// There are restrictions on the temporal predicates we currently support.
1769 /// They must directly constrain `MzNow` from below or above,
1770 /// by expressions that do not themselves contain `MzNow`.
1771 /// Conjunctions of such constraints are also ok.
1772 #[derive(Arbitrary, Clone, Debug, PartialEq)]
1773 pub struct MfpPlan {
1774 /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
1775 pub(crate) mfp: SafeMfpPlan,
1776 /// Expressions that when evaluated lower-bound `MzNow`.
1777 #[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
1778 pub(crate) lower_bounds: Vec<MirScalarExpr>,
1779 /// Expressions that when evaluated upper-bound `MzNow`.
1780 #[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
1781 pub(crate) upper_bounds: Vec<MirScalarExpr>,
1782 }
1783
1784 impl RustType<ProtoMfpPlan> for MfpPlan {
1785 fn into_proto(&self) -> ProtoMfpPlan {
1786 ProtoMfpPlan {
1787 mfp: Some(self.mfp.into_proto()),
1788 lower_bounds: self.lower_bounds.into_proto(),
1789 upper_bounds: self.upper_bounds.into_proto(),
1790 }
1791 }
1792
1793 fn from_proto(proto: ProtoMfpPlan) -> Result<Self, TryFromProtoError> {
1794 Ok(MfpPlan {
1795 mfp: proto.mfp.into_rust_if_some("ProtoMfpPlan::mfp")?,
1796 lower_bounds: proto.lower_bounds.into_rust()?,
1797 upper_bounds: proto.upper_bounds.into_rust()?,
1798 })
1799 }
1800 }
1801
1802 impl MfpPlan {
1803 /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
1804 ///
1805 /// The first returned list is of predicates that do not contain `mz_now`.
1806 /// The second and third returned lists contain expressions that, once evaluated, lower
1807 /// and upper bound the validity interval of a record, respectively. These second two
1808 /// lists are populated only by binary expressions of the form
1809 /// ```ignore
1810 /// mz_now cmp_op expr
1811 /// ```
1812 /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
1813 ///
1814 /// If any unsupported expression is found, for example one that uses `mz_now`
1815 /// in an unsupported position, an error is returned.
1816 pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
1817 let mut lower_bounds = Vec::new();
1818 let mut upper_bounds = Vec::new();
1819
1820 let mut temporal = Vec::new();
1821
1822 // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
1823 mfp.optimize();
1824
1825 mfp.predicates.retain(|(_position, predicate)| {
1826 if predicate.contains_temporal() {
1827 temporal.push(predicate.clone());
1828 false
1829 } else {
1830 true
1831 }
1832 });
1833
1834 for predicate in temporal.into_iter() {
1835 // Supported temporal predicates are exclusively binary operators.
1836 if let MirScalarExpr::CallBinary {
1837 mut func,
1838 mut expr1,
1839 mut expr2,
1840 } = predicate
1841 {
1842 // Attempt to put `LogicalTimestamp` in the first argument position.
1843 if !expr1.contains_temporal()
1844 && *expr2
1845 == MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
1846 {
1847 std::mem::swap(&mut expr1, &mut expr2);
1848 func = match func {
1849 BinaryFunc::Eq => BinaryFunc::Eq,
1850 BinaryFunc::Lt => BinaryFunc::Gt,
1851 BinaryFunc::Lte => BinaryFunc::Gte,
1852 BinaryFunc::Gt => BinaryFunc::Lt,
1853 BinaryFunc::Gte => BinaryFunc::Lte,
1854 x => {
1855 return Err(format!(
1856 "Unsupported binary temporal operation: {:?}",
1857 x
1858 ));
1859 }
1860 };
1861 }
1862
1863 // Error if MLT is referenced in an unsupported position.
1864 if expr2.contains_temporal()
1865 || *expr1
1866 != MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
1867 {
1868 return Err(format!(
1869 "Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a mz_timestamp-castable expression. Expression found: {}",
1870 MirScalarExpr::CallBinary { func, expr1, expr2 },
1871 ));
1872 }
1873
1874 // LogicalTimestamp <OP> <EXPR2> for several supported operators.
1875 match func {
1876 BinaryFunc::Eq => {
1877 lower_bounds.push(*expr2.clone());
1878 upper_bounds.push(
1879 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1880 );
1881 }
1882 BinaryFunc::Lt => {
1883 upper_bounds.push(*expr2.clone());
1884 }
1885 BinaryFunc::Lte => {
1886 upper_bounds.push(
1887 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1888 );
1889 }
1890 BinaryFunc::Gt => {
1891 lower_bounds.push(
1892 expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
1893 );
1894 }
1895 BinaryFunc::Gte => {
1896 lower_bounds.push(*expr2.clone());
1897 }
1898 _ => {
1899 return Err(format!(
1900 "Unsupported binary temporal operation: {:?}",
1901 func
1902 ));
1903 }
1904 }
1905 } else {
1906 return Err(format!(
1907 "Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a non-temporal expression of mz_timestamp-castable type. Expression found: {}",
1908 predicate,
1909 ));
1910 }
1911 }
1912
1913 Ok(Self {
1914 mfp: SafeMfpPlan { mfp },
1915 lower_bounds,
1916 upper_bounds,
1917 })
1918 }
1919
1920 /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
1921 pub fn is_identity(&self) -> bool {
1922 self.mfp.mfp.is_identity()
1923 && self.lower_bounds.is_empty()
1924 && self.upper_bounds.is_empty()
1925 }
1926
1927 /// Returns `self`, and leaves behind an identity operator that acts on its output.
1928 pub fn take(&mut self) -> Self {
1929 let mut identity = Self {
1930 mfp: SafeMfpPlan {
1931 mfp: MapFilterProject::new(self.mfp.projection.len()),
1932 },
1933 lower_bounds: Default::default(),
1934 upper_bounds: Default::default(),
1935 };
1936 std::mem::swap(self, &mut identity);
1937 identity
1938 }
1939
1940 /// Attempt to convert self into a non-temporal MapFilterProject plan.
1941 ///
1942 /// If that is not possible, the original instance is returned as an error.
1943 #[allow(clippy::result_large_err)]
1944 pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
1945 if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
1946 Ok(self.mfp)
1947 } else {
1948 Err(self)
1949 }
1950 }
1951
1952 /// Returns an iterator over mutable references to all non-temporal
1953 /// scalar expressions in the plan.
1954 ///
1955 /// The order of iteration is unspecified.
1956 pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
1957 iter::empty()
1958 .chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
1959 .chain(&mut self.mfp.mfp.expressions)
1960 .chain(&mut self.lower_bounds)
1961 .chain(&mut self.upper_bounds)
1962 }
1963
1964 /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
1965 ///
1966 /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
1967 /// or an evaluation error. If `self contains temporal predicates, the results can be times
1968 /// that are greater than the input `time`, and may contain negated `diff` values.
1969 ///
1970 /// The `row_builder` is not cleared first, but emptied if the function
1971 /// returns an iterator with any `Ok(_)` element.
1972 pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
1973 &'a self,
1974 datums: &'b mut Vec<Datum<'a>>,
1975 arena: &'a RowArena,
1976 time: mz_repr::Timestamp,
1977 diff: Diff,
1978 valid_time: V,
1979 row_builder: &mut Row,
1980 ) -> impl Iterator<
1981 Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>,
1982 > + use<E, V> {
1983 match self.mfp.evaluate_inner(datums, arena) {
1984 Err(e) => {
1985 return Some(Err((e.into(), time, diff))).into_iter().chain(None);
1986 }
1987 Ok(true) => {}
1988 Ok(false) => {
1989 return None.into_iter().chain(None);
1990 }
1991 }
1992
1993 // Lower and upper bounds.
1994 let mut lower_bound = time;
1995 let mut upper_bound = None;
1996
1997 // Track whether we have seen a null in either bound, as this should
1998 // prevent the record from being produced at any time.
1999 let mut null_eval = false;
2000
2001 // Advance our lower bound to be at least the result of any lower bound
2002 // expressions.
2003 for l in self.lower_bounds.iter() {
2004 match l.eval(datums, arena) {
2005 Err(e) => {
2006 return Some(Err((e.into(), time, diff)))
2007 .into_iter()
2008 .chain(None.into_iter());
2009 }
2010 Ok(Datum::MzTimestamp(d)) => {
2011 lower_bound = lower_bound.max(d);
2012 }
2013 Ok(Datum::Null) => {
2014 null_eval = true;
2015 }
2016 x => {
2017 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
2018 }
2019 }
2020 }
2021
2022 // If the lower bound exceeds our `until` frontier, it should not appear in the output.
2023 if !valid_time(&lower_bound) {
2024 return None.into_iter().chain(None);
2025 }
2026
2027 // If there are any upper bounds, determine the minimum upper bound.
2028 for u in self.upper_bounds.iter() {
2029 // We can cease as soon as the lower and upper bounds match,
2030 // as the update will certainly not be produced in that case.
2031 if upper_bound != Some(lower_bound) {
2032 match u.eval(datums, arena) {
2033 Err(e) => {
2034 return Some(Err((e.into(), time, diff)))
2035 .into_iter()
2036 .chain(None.into_iter());
2037 }
2038 Ok(Datum::MzTimestamp(d)) => {
2039 if let Some(upper) = upper_bound {
2040 upper_bound = Some(upper.min(d));
2041 } else {
2042 upper_bound = Some(d);
2043 };
2044 // Force the upper bound to be at least the lower
2045 // bound. The `is_some()` test should always be true
2046 // due to the above block, but maintain it here in
2047 // case that changes. It's hopefully optimized away.
2048 if upper_bound.is_some() && upper_bound < Some(lower_bound) {
2049 upper_bound = Some(lower_bound);
2050 }
2051 }
2052 Ok(Datum::Null) => {
2053 null_eval = true;
2054 }
2055 x => {
2056 panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
2057 }
2058 }
2059 }
2060 }
2061
2062 // If the upper bound exceeds our `until` frontier, it should not appear in the output.
2063 if let Some(upper) = &mut upper_bound {
2064 if !valid_time(upper) {
2065 upper_bound = None;
2066 }
2067 }
2068
2069 // Produce an output only if the upper bound exceeds the lower bound,
2070 // and if we did not encounter a `null` in our evaluation.
2071 if Some(lower_bound) != upper_bound && !null_eval {
2072 row_builder
2073 .packer()
2074 .extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
2075 let upper_opt =
2076 upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
2077 let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
2078 lower.into_iter().chain(upper_opt)
2079 } else {
2080 None.into_iter().chain(None)
2081 }
2082 }
2083
2084 /// Returns true if evaluation could introduce an error on non-error inputs.
2085 pub fn could_error(&self) -> bool {
2086 self.mfp.could_error()
2087 || self.lower_bounds.iter().any(|e| e.could_error())
2088 || self.upper_bounds.iter().any(|e| e.could_error())
2089 }
2090
2091 /// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
2092 ///
2093 /// At the moment, this is only true if it projects away all columns and applies no filters,
2094 /// but it could be extended to plans that produce literals independent of the input.
2095 pub fn ignores_input(&self) -> bool {
2096 self.lower_bounds.is_empty()
2097 && self.upper_bounds.is_empty()
2098 && self.mfp.mfp.projection.is_empty()
2099 && self.mfp.mfp.predicates.is_empty()
2100 }
2101 }
2102}
2103
2104#[cfg(test)]
2105mod tests {
2106 use mz_ore::assert_ok;
2107 use mz_proto::protobuf_roundtrip;
2108
2109 use crate::linear::plan::*;
2110
2111 use super::*;
2112
2113 proptest! {
2114 #![proptest_config(ProptestConfig::with_cases(32))]
2115
2116 #[mz_ore::test]
2117 #[cfg_attr(miri, ignore)] // too slow
2118 fn mfp_plan_protobuf_roundtrip(expect in any::<MfpPlan>()) {
2119 let actual = protobuf_roundtrip::<_, ProtoMfpPlan>(&expect);
2120 assert_ok!(actual);
2121 assert_eq!(actual.unwrap(), expect);
2122 }
2123 }
2124}