mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU64;
16
17use columnar::Columnar;
18use mz_expr::{
19 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
20 OptimizedMirRelationExpr, TableFunc,
21};
22use mz_ore::soft_assert_eq_no_log;
23use mz_ore::str::Indent;
24use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
25use mz_repr::explain::text::text_string_at;
26use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
27use mz_repr::optimize::OptimizerFeatures;
28use mz_repr::{Diff, GlobalId, Row};
29use proptest::arbitrary::Arbitrary;
30use proptest::prelude::*;
31use proptest::strategy::Strategy;
32use proptest_derive::Arbitrary;
33use serde::{Deserialize, Serialize};
34
35use crate::dataflows::DataflowDescription;
36use crate::plan::join::JoinPlan;
37use crate::plan::reduce::{KeyValPlan, ReducePlan};
38use crate::plan::threshold::ThresholdPlan;
39use crate::plan::top_k::TopKPlan;
40use crate::plan::transform::{Transform, TransformConfig};
41
42mod lowering;
43
44pub mod interpret;
45pub mod join;
46pub mod reduce;
47pub mod render_plan;
48pub mod threshold;
49pub mod top_k;
50pub mod transform;
51
52include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.rs"));
53
54/// The forms in which an operator's output is available.
55///
56/// These forms may include "raw", meaning as a streamed collection, but also any
57/// number of "arranged" representations.
58///
59/// Each arranged representation is described by a `KeyValRowMapping`, or rather
60/// at the moment by its three fields in a triple. These fields explain how to form
61/// a "key" by applying some expressions to each row, how to select "values" from
62/// columns not explicitly captured by the key, and how to return to the original
63/// row from the concatenation of key and value. Further explanation is available
64/// in the documentation for `KeyValRowMapping`.
65#[derive(
66 Arbitrary, Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize,
67)]
68pub struct AvailableCollections {
69 /// Whether the collection exists in unarranged form.
70 pub raw: bool,
71 /// The list of available arrangements, presented as a `KeyValRowMapping`,
72 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73 /// The documentation for `KeyValRowMapping` explains these fields better.
74 #[proptest(strategy = "prop::collection::vec(any_arranged_thin(), 0..3)")]
75 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
76}
77
78/// A strategy that produces arrangements that are thinner than the default. That is
79/// the number of direct children is limited to a maximum of 3.
80pub(crate) fn any_arranged_thin()
81-> impl Strategy<Value = (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
82 (
83 prop::collection::vec(MirScalarExpr::arbitrary(), 0..3),
84 Vec::<usize>::arbitrary(),
85 Vec::<usize>::arbitrary(),
86 )
87}
88
89impl RustType<ProtoAvailableCollections> for AvailableCollections {
90 fn into_proto(&self) -> ProtoAvailableCollections {
91 ProtoAvailableCollections {
92 raw: self.raw,
93 arranged: self.arranged.into_proto(),
94 }
95 }
96
97 fn from_proto(x: ProtoAvailableCollections) -> Result<Self, TryFromProtoError> {
98 Ok({
99 Self {
100 raw: x.raw,
101 arranged: x.arranged.into_rust()?,
102 }
103 })
104 }
105}
106
107impl AvailableCollections {
108 /// Represent a collection that has no arrangements.
109 pub fn new_raw() -> Self {
110 Self {
111 raw: true,
112 arranged: Vec::new(),
113 }
114 }
115
116 /// Represent a collection that is arranged in the specified ways.
117 pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
118 assert!(
119 !arranged.is_empty(),
120 "Invariant violated: at least one collection must exist"
121 );
122 Self {
123 raw: false,
124 arranged,
125 }
126 }
127
128 /// Get some arrangement, if one exists.
129 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
130 assert!(
131 self.raw || !self.arranged.is_empty(),
132 "Invariant violated: at least one collection must exist"
133 );
134 self.arranged.get(0)
135 }
136}
137
138/// An identifier for an LIR node.
139#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
140pub struct LirId(u64);
141
142impl LirId {
143 fn as_u64(&self) -> u64 {
144 self.0
145 }
146}
147
148impl From<LirId> for u64 {
149 fn from(value: LirId) -> Self {
150 value.as_u64()
151 }
152}
153
154impl std::fmt::Display for LirId {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 write!(f, "{}", self.0)
157 }
158}
159
160impl RustType<u64> for LirId {
161 fn into_proto(&self) -> u64 {
162 self.0
163 }
164
165 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
166 Ok(Self(proto))
167 }
168}
169
170/// A rendering plan with as much conditional logic as possible removed.
171#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
172pub struct Plan<T = mz_repr::Timestamp> {
173 /// A dataflow-local identifier.
174 pub lir_id: LirId,
175 /// The underlying operator.
176 pub node: PlanNode<T>,
177}
178
179/// The actual AST node of the `Plan`.
180#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
181pub enum PlanNode<T = mz_repr::Timestamp> {
182 /// A collection containing a pre-determined collection.
183 Constant {
184 /// Explicit update triples for the collection.
185 rows: Result<Vec<(Row, T, Diff)>, EvalError>,
186 },
187 /// A reference to a bound collection.
188 ///
189 /// This is commonly either an external reference to an existing source or
190 /// maintained arrangement, or an internal reference to a `Let` identifier.
191 Get {
192 /// A global or local identifier naming the collection.
193 id: Id,
194 /// Arrangements that will be available.
195 ///
196 /// The collection will also be loaded if available, which it will
197 /// not be for imported data, but which it may be for locally defined
198 /// data.
199 // TODO: Be more explicit about whether a collection is available,
200 // although one can always produce it from an arrangement, and it
201 // seems generally advantageous to do that instead (to avoid cloning
202 // rows, by using `mfp` first on borrowed data).
203 keys: AvailableCollections,
204 /// The actions to take when introducing the collection.
205 plan: GetPlan,
206 },
207 /// Binds `value` to `id`, and then results in `body` with that binding.
208 ///
209 /// This stage has the effect of sharing `value` across multiple possible
210 /// uses in `body`, and is the only mechanism we have for sharing collection
211 /// information across parts of a dataflow.
212 ///
213 /// The binding is not available outside of `body`.
214 Let {
215 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
216 id: LocalId,
217 /// The collection that should be bound to `id`.
218 value: Box<Plan<T>>,
219 /// The collection that results, which is allowed to contain `Get` stages
220 /// that reference `Id::Local(id)`.
221 body: Box<Plan<T>>,
222 },
223 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
224 ///
225 /// All bindings are available to all bindings, and to `body`.
226 /// The contents of each binding are initially empty, and then updated through a sequence
227 /// of iterations in which each binding is updated in sequence, from the most recent values
228 /// of all bindings.
229 LetRec {
230 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
231 ids: Vec<LocalId>,
232 /// The collection that should be bound to `id`.
233 values: Vec<Plan<T>>,
234 /// Maximum number of iterations. See further info on the MIR `LetRec`.
235 limits: Vec<Option<LetRecLimit>>,
236 /// The collection that results, which is allowed to contain `Get` stages
237 /// that reference `Id::Local(id)`.
238 body: Box<Plan<T>>,
239 },
240 /// Map, Filter, and Project operators.
241 ///
242 /// This stage contains work that we would ideally like to fuse to other plan
243 /// stages, but for practical reasons cannot. For example: threshold, topk,
244 /// and sometimes reduce stages are not able to absorb this operator.
245 Mfp {
246 /// The input collection.
247 input: Box<Plan<T>>,
248 /// Linear operator to apply to each record.
249 mfp: MapFilterProject,
250 /// Whether the input is from an arrangement, and if so,
251 /// whether we can seek to a specific value therein
252 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
253 },
254 /// A variable number of output records for each input record.
255 ///
256 /// This stage is a bit of a catch-all for logic that does not easily fit in
257 /// map stages. This includes table valued functions, but also functions of
258 /// multiple arguments, and functions that modify the sign of updates.
259 ///
260 /// This stage allows a `MapFilterProject` operator to be fused to its output,
261 /// and this can be very important as otherwise the output of `func` is just
262 /// appended to the input record, for as many outputs as it has. This has the
263 /// unpleasant default behavior of repeating potentially large records that
264 /// are being unpacked, producing quadratic output in those cases. Instead,
265 /// in these cases use a `mfp` member that projects away these large fields.
266 FlatMap {
267 /// The particular arrangement of the input we expect to use,
268 /// if any
269 input_key: Option<Vec<MirScalarExpr>>,
270 /// The input collection.
271 input: Box<Plan<T>>,
272 /// Expressions that for each row prepare the arguments to `func`.
273 exprs: Vec<MirScalarExpr>,
274 /// The variable-record emitting function.
275 func: TableFunc,
276 /// Linear operator to apply to each record produced by `func`.
277 mfp_after: MapFilterProject,
278 },
279 /// A multiway relational equijoin, with fused map, filter, and projection.
280 ///
281 /// This stage performs a multiway join among `inputs`, using the equality
282 /// constraints expressed in `plan`. The plan also describes the implementation
283 /// strategy we will use, and any pushed down per-record work.
284 Join {
285 /// An ordered list of inputs that will be joined.
286 inputs: Vec<Plan<T>>,
287 /// Detailed information about the implementation of the join.
288 ///
289 /// This includes information about the implementation strategy, but also
290 /// any map, filter, project work that we might follow the join with, but
291 /// potentially pushed down into the implementation of the join.
292 plan: JoinPlan,
293 },
294 /// Aggregation by key.
295 Reduce {
296 /// The particular arrangement of the input we expect to use,
297 /// if any
298 input_key: Option<Vec<MirScalarExpr>>,
299 /// The input collection.
300 input: Box<Plan<T>>,
301 /// A plan for changing input records into key, value pairs.
302 key_val_plan: KeyValPlan,
303 /// A plan for performing the reduce.
304 ///
305 /// The implementation of reduction has several different strategies based
306 /// on the properties of the reduction, and the input itself. Please check
307 /// out the documentation for this type for more detail.
308 plan: ReducePlan,
309 /// An MFP that must be applied to results. The projection part of this
310 /// MFP must preserve the key for the reduction; otherwise, the results
311 /// become undefined. Additionally, the MFP must be free from temporal
312 /// predicates so that it can be readily evaluated.
313 /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
314 mfp_after: MapFilterProject,
315 },
316 /// Key-based "Top K" operator, retaining the first K records in each group.
317 TopK {
318 /// The input collection.
319 input: Box<Plan<T>>,
320 /// A plan for performing the Top-K.
321 ///
322 /// The implementation of reduction has several different strategies based
323 /// on the properties of the reduction, and the input itself. Please check
324 /// out the documentation for this type for more detail.
325 top_k_plan: TopKPlan,
326 },
327 /// Inverts the sign of each update.
328 Negate {
329 /// The input collection.
330 input: Box<Plan<T>>,
331 },
332 /// Filters records that accumulate negatively.
333 ///
334 /// Although the operator suppresses updates, it is a stateful operator taking
335 /// resources proportional to the number of records with non-zero accumulation.
336 Threshold {
337 /// The input collection.
338 input: Box<Plan<T>>,
339 /// A plan for performing the threshold.
340 ///
341 /// The implementation of reduction has several different strategies based
342 /// on the properties of the reduction, and the input itself. Please check
343 /// out the documentation for this type for more detail.
344 threshold_plan: ThresholdPlan,
345 },
346 /// Adds the contents of the input collections.
347 ///
348 /// Importantly, this is *multiset* union, so the multiplicities of records will
349 /// add. This is in contrast to *set* union, where the multiplicities would be
350 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
351 /// implementing the "distinct" operator.
352 Union {
353 /// The input collections
354 inputs: Vec<Plan<T>>,
355 /// Whether to consolidate the output, e.g., cancel negated records.
356 consolidate_output: bool,
357 },
358 /// The `input` plan, but with additional arrangements.
359 ///
360 /// This operator does not change the logical contents of `input`, but ensures
361 /// that certain arrangements are available in the results. This operator can
362 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
363 /// or to cap a `Plan` so that indexes can be exported.
364 ArrangeBy {
365 /// The key that must be used to access the input.
366 input_key: Option<Vec<MirScalarExpr>>,
367 /// The input collection.
368 input: Box<Plan<T>>,
369 /// The MFP that must be applied to the input.
370 input_mfp: MapFilterProject,
371 /// A list of arrangement keys, and possibly a raw collection,
372 /// that will be added to those of the input. Does not include
373 /// any other existing arrangements.
374 forms: AvailableCollections,
375 },
376}
377
378impl<T> PlanNode<T> {
379 /// Iterates through references to child expressions.
380 pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
381 let mut first = None;
382 let mut second = None;
383 let mut rest = None;
384 let mut last = None;
385
386 use PlanNode::*;
387 match self {
388 Constant { .. } | Get { .. } => (),
389 Let { value, body, .. } => {
390 first = Some(&**value);
391 second = Some(&**body);
392 }
393 LetRec { values, body, .. } => {
394 rest = Some(values);
395 last = Some(&**body);
396 }
397 Mfp { input, .. }
398 | FlatMap { input, .. }
399 | Reduce { input, .. }
400 | TopK { input, .. }
401 | Negate { input, .. }
402 | Threshold { input, .. }
403 | ArrangeBy { input, .. } => {
404 first = Some(&**input);
405 }
406 Join { inputs, .. } | Union { inputs, .. } => {
407 rest = Some(inputs);
408 }
409 }
410
411 first
412 .into_iter()
413 .chain(second)
414 .chain(rest.into_iter().flatten())
415 .chain(last)
416 }
417
418 /// Iterates through mutable references to child expressions.
419 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
420 let mut first = None;
421 let mut second = None;
422 let mut rest = None;
423 let mut last = None;
424
425 use PlanNode::*;
426 match self {
427 Constant { .. } | Get { .. } => (),
428 Let { value, body, .. } => {
429 first = Some(&mut **value);
430 second = Some(&mut **body);
431 }
432 LetRec { values, body, .. } => {
433 rest = Some(values);
434 last = Some(&mut **body);
435 }
436 Mfp { input, .. }
437 | FlatMap { input, .. }
438 | Reduce { input, .. }
439 | TopK { input, .. }
440 | Negate { input, .. }
441 | Threshold { input, .. }
442 | ArrangeBy { input, .. } => {
443 first = Some(&mut **input);
444 }
445 Join { inputs, .. } | Union { inputs, .. } => {
446 rest = Some(inputs);
447 }
448 }
449
450 first
451 .into_iter()
452 .chain(second)
453 .chain(rest.into_iter().flatten())
454 .chain(last)
455 }
456}
457
458impl<T> PlanNode<T> {
459 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
460 pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
461 Plan { lir_id, node: self }
462 }
463}
464
465impl Plan {
466 /// Pretty-print this [Plan] to a string.
467 pub fn pretty(&self) -> String {
468 let config = ExplainConfig::default();
469 self.explain(&config, None)
470 }
471
472 /// Pretty-print this [Plan] to a string using a custom
473 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
474 pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
475 text_string_at(self, || PlanRenderingContext {
476 indent: Indent::default(),
477 humanizer: humanizer.unwrap_or(&DummyHumanizer),
478 annotations: BTreeMap::default(),
479 config,
480 })
481 }
482}
483
484impl Arbitrary for LirId {
485 type Strategy = BoxedStrategy<LirId>;
486 type Parameters = ();
487
488 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
489 let lir_id = u64::arbitrary();
490 lir_id.prop_map(LirId).boxed()
491 }
492}
493
494impl Arbitrary for Plan {
495 type Strategy = BoxedStrategy<Plan>;
496 type Parameters = ();
497
498 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
499 let row_diff = prop::collection::vec(
500 (
501 Row::arbitrary_with((1..5).into()),
502 mz_repr::Timestamp::arbitrary(),
503 Diff::arbitrary(),
504 ),
505 0..2,
506 );
507 let rows = prop::result::maybe_ok(row_diff, EvalError::arbitrary());
508 let constant = (rows, any::<LirId>()).prop_map(|(rows, lir_id)| {
509 PlanNode::<mz_repr::Timestamp>::Constant { rows }.as_plan(lir_id)
510 });
511
512 let get = (
513 any::<GlobalId>(),
514 any::<AvailableCollections>(),
515 any::<GetPlan>(),
516 any::<LirId>(),
517 )
518 .prop_map(|(id, keys, plan, lir_id)| {
519 PlanNode::<mz_repr::Timestamp>::Get {
520 id: Id::Global(id),
521 keys,
522 plan,
523 }
524 .as_plan(lir_id)
525 });
526
527 let leaf = prop::strategy::Union::new(vec![constant.boxed(), get.boxed()]).boxed();
528
529 leaf.prop_recursive(2, 4, 5, |inner| {
530 prop::strategy::Union::new(vec![
531 //Plan::Let
532 (
533 any::<LocalId>(),
534 inner.clone(),
535 inner.clone(),
536 any::<LirId>(),
537 )
538 .prop_map(|(id, value, body, lir_id)| {
539 PlanNode::<mz_repr::Timestamp>::Let {
540 id,
541 value: value.into(),
542 body: body.into(),
543 }
544 .as_plan(lir_id)
545 })
546 .boxed(),
547 //Plan::Mfp
548 (
549 inner.clone(),
550 any::<MapFilterProject>(),
551 any::<Option<(Vec<MirScalarExpr>, Option<Row>)>>(),
552 any::<LirId>(),
553 )
554 .prop_map(|(input, mfp, input_key_val, lir_id)| {
555 PlanNode::Mfp {
556 input: input.into(),
557 mfp,
558 input_key_val,
559 }
560 .as_plan(lir_id)
561 })
562 .boxed(),
563 //Plan::FlatMap
564 (
565 inner.clone(),
566 any::<TableFunc>(),
567 any::<Vec<MirScalarExpr>>(),
568 any::<MapFilterProject>(),
569 any::<Option<Vec<MirScalarExpr>>>(),
570 any::<LirId>(),
571 )
572 .prop_map(|(input, func, exprs, mfp, input_key, lir_id)| {
573 PlanNode::FlatMap {
574 input_key,
575 input: input.into(),
576 exprs,
577 func,
578 mfp_after: mfp,
579 }
580 .as_plan(lir_id)
581 })
582 .boxed(),
583 //Plan::Join
584 (
585 prop::collection::vec(inner.clone(), 0..2),
586 any::<JoinPlan>(),
587 any::<LirId>(),
588 )
589 .prop_map(|(inputs, plan, lir_id)| {
590 PlanNode::Join { inputs, plan }.as_plan(lir_id)
591 })
592 .boxed(),
593 //Plan::Reduce
594 (
595 inner.clone(),
596 any::<KeyValPlan>(),
597 any::<ReducePlan>(),
598 any::<Option<Vec<MirScalarExpr>>>(),
599 any::<MapFilterProject>(),
600 any::<LirId>(),
601 )
602 .prop_map(
603 |(input, key_val_plan, plan, input_key, mfp_after, lir_id)| {
604 PlanNode::Reduce {
605 input_key,
606 input: input.into(),
607 key_val_plan,
608 plan,
609 mfp_after,
610 }
611 .as_plan(lir_id)
612 },
613 )
614 .boxed(),
615 //Plan::TopK
616 (inner.clone(), any::<TopKPlan>(), any::<LirId>())
617 .prop_map(|(input, top_k_plan, lir_id)| {
618 PlanNode::TopK {
619 input: input.into(),
620 top_k_plan,
621 }
622 .as_plan(lir_id)
623 })
624 .boxed(),
625 //Plan::Negate
626 (inner.clone(), any::<LirId>())
627 .prop_map(|(x, lir_id)| PlanNode::Negate { input: x.into() }.as_plan(lir_id))
628 .boxed(),
629 //Plan::Threshold
630 (inner.clone(), any::<ThresholdPlan>(), any::<LirId>())
631 .prop_map(|(input, threshold_plan, lir_id)| {
632 PlanNode::Threshold {
633 input: input.into(),
634 threshold_plan,
635 }
636 .as_plan(lir_id)
637 })
638 .boxed(),
639 // Plan::Union
640 (
641 prop::collection::vec(inner.clone(), 0..2),
642 any::<bool>(),
643 any::<LirId>(),
644 )
645 .prop_map(|(x, b, lir_id)| {
646 PlanNode::Union {
647 inputs: x,
648 consolidate_output: b,
649 }
650 .as_plan(lir_id)
651 })
652 .boxed(),
653 //Plan::ArrangeBy
654 (
655 inner,
656 any::<AvailableCollections>(),
657 any::<Option<Vec<MirScalarExpr>>>(),
658 any::<MapFilterProject>(),
659 any::<LirId>(),
660 )
661 .prop_map(|(input, forms, input_key, input_mfp, lir_id)| {
662 PlanNode::ArrangeBy {
663 input_key,
664 input: input.into(),
665 input_mfp,
666 forms,
667 }
668 .as_plan(lir_id)
669 })
670 .boxed(),
671 ])
672 })
673 .boxed()
674 }
675}
676
677/// How a `Get` stage will be rendered.
678#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
679pub enum GetPlan {
680 /// Simply pass input arrangements on to the next stage.
681 PassArrangements,
682 /// Using the supplied key, optionally seek the row, and apply the MFP.
683 Arrangement(
684 #[proptest(strategy = "prop::collection::vec(MirScalarExpr::arbitrary(), 0..3)")]
685 Vec<MirScalarExpr>,
686 Option<Row>,
687 MapFilterProject,
688 ),
689 /// Scan the input collection (unarranged) and apply the MFP.
690 Collection(MapFilterProject),
691}
692
693impl RustType<ProtoGetPlan> for GetPlan {
694 fn into_proto(&self) -> ProtoGetPlan {
695 use proto_get_plan::Kind::*;
696
697 ProtoGetPlan {
698 kind: Some(match self {
699 GetPlan::PassArrangements => PassArrangements(()),
700 GetPlan::Arrangement(k, s, m) => {
701 Arrangement(proto_get_plan::ProtoGetPlanArrangement {
702 key: k.into_proto(),
703 seek: s.into_proto(),
704 mfp: Some(m.into_proto()),
705 })
706 }
707 GetPlan::Collection(mfp) => Collection(mfp.into_proto()),
708 }),
709 }
710 }
711
712 fn from_proto(proto: ProtoGetPlan) -> Result<Self, TryFromProtoError> {
713 use proto_get_plan::Kind::*;
714 use proto_get_plan::ProtoGetPlanArrangement;
715 match proto.kind {
716 Some(PassArrangements(())) => Ok(GetPlan::PassArrangements),
717 Some(Arrangement(ProtoGetPlanArrangement { key, seek, mfp })) => {
718 Ok(GetPlan::Arrangement(
719 key.into_rust()?,
720 seek.into_rust()?,
721 mfp.into_rust_if_some("ProtoGetPlanArrangement::mfp")?,
722 ))
723 }
724 Some(Collection(mfp)) => Ok(GetPlan::Collection(mfp.into_rust()?)),
725 None => Err(TryFromProtoError::missing_field("ProtoGetPlan::kind")),
726 }
727 }
728}
729
730impl RustType<ProtoLetRecLimit> for LetRecLimit {
731 fn into_proto(&self) -> ProtoLetRecLimit {
732 ProtoLetRecLimit {
733 max_iters: self.max_iters.get(),
734 return_at_limit: self.return_at_limit,
735 }
736 }
737
738 fn from_proto(proto: ProtoLetRecLimit) -> Result<Self, TryFromProtoError> {
739 Ok(LetRecLimit {
740 max_iters: NonZeroU64::new(proto.max_iters).expect("max_iters > 0"),
741 return_at_limit: proto.return_at_limit,
742 })
743 }
744}
745
746impl<T: timely::progress::Timestamp> Plan<T> {
747 /// Convert the dataflow description into one that uses render plans.
748 #[mz_ore::instrument(
749 target = "optimizer",
750 level = "debug",
751 fields(path.segment = "finalize_dataflow")
752 )]
753 pub fn finalize_dataflow(
754 desc: DataflowDescription<OptimizedMirRelationExpr>,
755 features: &OptimizerFeatures,
756 ) -> Result<DataflowDescription<Self>, String> {
757 // First, we lower the dataflow description from MIR to LIR.
758 let mut dataflow = Self::lower_dataflow(desc, features)?;
759
760 // Subsequently, we perform plan refinements for the dataflow.
761 Self::refine_source_mfps(&mut dataflow);
762
763 if features.enable_consolidate_after_union_negate {
764 Self::refine_union_negate_consolidation(&mut dataflow);
765 }
766
767 if dataflow.is_single_time() {
768 Self::refine_single_time_operator_selection(&mut dataflow);
769
770 // The relaxation of the `must_consolidate` flag performs an LIR-based
771 // analysis and transform under checked recursion. By a similar argument
772 // made in `from_mir`, we do not expect the recursion limit to be hit.
773 // However, if that happens, we propagate an error to the caller.
774 // To apply the transform, we first obtain monotonic source and index
775 // global IDs and add them to a `TransformConfig` instance.
776 let monotonic_ids = dataflow
777 .source_imports
778 .iter()
779 .filter_map(|(id, (_, monotonic, _upper))| if *monotonic { Some(id) } else { None })
780 .chain(
781 dataflow
782 .index_imports
783 .iter()
784 .filter_map(|(id, index_import)| {
785 if index_import.monotonic {
786 Some(id)
787 } else {
788 None
789 }
790 }),
791 )
792 .cloned()
793 .collect::<BTreeSet<_>>();
794
795 let config = TransformConfig { monotonic_ids };
796 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
797 }
798
799 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
800
801 mz_repr::explain::trace_plan(&dataflow);
802
803 Ok(dataflow)
804 }
805
806 /// Lowers the dataflow description from MIR to LIR. To this end, the
807 /// method collects all available arrangements and based on this information
808 /// creates plans for every object to be built for the dataflow.
809 #[mz_ore::instrument(
810 target = "optimizer",
811 level = "debug",
812 fields(path.segment ="mir_to_lir")
813 )]
814 fn lower_dataflow(
815 desc: DataflowDescription<OptimizedMirRelationExpr>,
816 features: &OptimizerFeatures,
817 ) -> Result<DataflowDescription<Self>, String> {
818 let context = lowering::Context::new(desc.debug_name.clone(), features);
819 let dataflow = context.lower(desc)?;
820
821 mz_repr::explain::trace_plan(&dataflow);
822
823 Ok(dataflow)
824 }
825
826 /// Refines the source instance descriptions for sources imported by `dataflow` to
827 /// push down common MFP expressions.
828 #[mz_ore::instrument(
829 target = "optimizer",
830 level = "debug",
831 fields(path.segment = "refine_source_mfps")
832 )]
833 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
834 // Extract MFPs from Get operators for sources, and extract what we can for the source.
835 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
836 for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
837 let mut identity_present = false;
838 let mut mfps = Vec::new();
839 for build_desc in dataflow.objects_to_build.iter_mut() {
840 let mut todo = vec![&mut build_desc.plan];
841 while let Some(expression) = todo.pop() {
842 let node = &mut expression.node;
843 if let PlanNode::Get { id, plan, .. } = node {
844 if *id == mz_expr::Id::Global(*source_id) {
845 match plan {
846 GetPlan::Collection(mfp) => mfps.push(mfp),
847 GetPlan::PassArrangements => {
848 identity_present = true;
849 }
850 GetPlan::Arrangement(..) => {
851 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
852 }
853 }
854 }
855 } else {
856 todo.extend(node.children_mut());
857 }
858 }
859 }
860
861 // Direct exports of sources are possible, and prevent pushdown.
862 identity_present |= dataflow
863 .index_exports
864 .values()
865 .any(|(x, _)| x.on_id == *source_id);
866 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
867
868 if !identity_present && !mfps.is_empty() {
869 // Extract a common prefix `MapFilterProject` from `mfps`.
870 let common = MapFilterProject::extract_common(&mut mfps[..]);
871 // Apply common expressions to the source's `MapFilterProject`.
872 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
873 MapFilterProject::compose(mfp, common)
874 } else {
875 common
876 };
877 mfp.optimize();
878 source.arguments.operators = Some(mfp);
879 }
880 }
881 mz_repr::explain::trace_plan(dataflow);
882 }
883
884 /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
885 #[mz_ore::instrument(
886 target = "optimizer",
887 level = "debug",
888 fields(path.segment = "refine_union_negate_consolidation")
889 )]
890 fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
891 for build_desc in dataflow.objects_to_build.iter_mut() {
892 let mut todo = vec![&mut build_desc.plan];
893 while let Some(expression) = todo.pop() {
894 let node = &mut expression.node;
895 match node {
896 PlanNode::Union {
897 inputs,
898 consolidate_output,
899 ..
900 } => {
901 if inputs
902 .iter()
903 .any(|input| matches!(input.node, PlanNode::Negate { .. }))
904 {
905 *consolidate_output = true;
906 }
907 }
908 _ => {}
909 }
910 todo.extend(node.children_mut());
911 }
912 }
913 mz_repr::explain::trace_plan(dataflow);
914 }
915
916 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
917 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
918 /// one-shot SELECT query.
919 #[mz_ore::instrument(
920 target = "optimizer",
921 level = "debug",
922 fields(path.segment = "refine_single_time_operator_selection")
923 )]
924 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
925 // We should only reach here if we have a one-shot SELECT query, i.e.,
926 // a single-time dataflow.
927 assert!(dataflow.is_single_time());
928
929 // Upgrade single-time plans to monotonic.
930 for build_desc in dataflow.objects_to_build.iter_mut() {
931 let mut todo = vec![&mut build_desc.plan];
932 while let Some(expression) = todo.pop() {
933 let node = &mut expression.node;
934 match node {
935 PlanNode::Reduce { plan, .. } => {
936 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
937 match plan {
938 ReducePlan::Collation(collation) => {
939 collation.as_monotonic(true);
940 }
941 ReducePlan::Hierarchical(hierarchical) => {
942 hierarchical.as_monotonic(true);
943 }
944 _ => {
945 // Nothing to do for other plans, and doing nothing is safe for future variants.
946 }
947 }
948 todo.extend(node.children_mut());
949 }
950 PlanNode::TopK { top_k_plan, .. } => {
951 top_k_plan.as_monotonic(true);
952 todo.extend(node.children_mut());
953 }
954 PlanNode::LetRec { body, .. } => {
955 // Only the non-recursive `body` is restricted to a single time.
956 todo.push(body);
957 }
958 _ => {
959 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
960 todo.extend(node.children_mut());
961 }
962 }
963 }
964 }
965 mz_repr::explain::trace_plan(dataflow);
966 }
967
968 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
969 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
970 /// whenever the input is deemed to be physically monotonic.
971 #[mz_ore::instrument(
972 target = "optimizer",
973 level = "debug",
974 fields(path.segment = "refine_single_time_consolidation")
975 )]
976 fn refine_single_time_consolidation(
977 dataflow: &mut DataflowDescription<Self>,
978 config: &TransformConfig,
979 ) -> Result<(), String> {
980 // We should only reach here if we have a one-shot SELECT query, i.e.,
981 // a single-time dataflow.
982 assert!(dataflow.is_single_time());
983
984 let transform = transform::RelaxMustConsolidate::<T>::new();
985 for build_desc in dataflow.objects_to_build.iter_mut() {
986 transform
987 .transform(config, &mut build_desc.plan)
988 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
989 }
990 mz_repr::explain::trace_plan(dataflow);
991 Ok(())
992 }
993}
994
995impl<T> CollectionPlan for PlanNode<T> {
996 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
997 match self {
998 PlanNode::Constant { rows: _ } => (),
999 PlanNode::Get {
1000 id,
1001 keys: _,
1002 plan: _,
1003 } => match id {
1004 Id::Global(id) => {
1005 out.insert(*id);
1006 }
1007 Id::Local(_) => (),
1008 },
1009 PlanNode::Let { id: _, value, body } => {
1010 value.depends_on_into(out);
1011 body.depends_on_into(out);
1012 }
1013 PlanNode::LetRec {
1014 ids: _,
1015 values,
1016 limits: _,
1017 body,
1018 } => {
1019 for value in values.iter() {
1020 value.depends_on_into(out);
1021 }
1022 body.depends_on_into(out);
1023 }
1024 PlanNode::Join { inputs, plan: _ }
1025 | PlanNode::Union {
1026 inputs,
1027 consolidate_output: _,
1028 } => {
1029 for input in inputs {
1030 input.depends_on_into(out);
1031 }
1032 }
1033 PlanNode::Mfp {
1034 input,
1035 mfp: _,
1036 input_key_val: _,
1037 }
1038 | PlanNode::FlatMap {
1039 input_key: _,
1040 input,
1041 exprs: _,
1042 func: _,
1043 mfp_after: _,
1044 }
1045 | PlanNode::ArrangeBy {
1046 input_key: _,
1047 input,
1048 input_mfp: _,
1049 forms: _,
1050 }
1051 | PlanNode::Reduce {
1052 input_key: _,
1053 input,
1054 key_val_plan: _,
1055 plan: _,
1056 mfp_after: _,
1057 }
1058 | PlanNode::TopK {
1059 input,
1060 top_k_plan: _,
1061 }
1062 | PlanNode::Negate { input }
1063 | PlanNode::Threshold {
1064 input,
1065 threshold_plan: _,
1066 } => {
1067 input.depends_on_into(out);
1068 }
1069 }
1070 }
1071}
1072
1073impl<T> CollectionPlan for Plan<T> {
1074 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1075 self.node.depends_on_into(out);
1076 }
1077}
1078
1079/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
1080/// on the expected number of rows that will have the same group key.
1081fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
1082 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
1083 // mz_introspection.mz_expected_group_size_advice.
1084 let mut buckets = vec![];
1085 let mut current = 16;
1086
1087 // Plan for 4B records in the expected case if the user didn't specify a group size.
1088 let limit = expected_group_size.unwrap_or(4_000_000_000);
1089
1090 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
1091 // each layer gets from the preceding layer, while also limiting the number of layers.
1092 while current < limit {
1093 buckets.push(current);
1094 current = current.saturating_mul(16);
1095 }
1096
1097 buckets.reverse();
1098 buckets
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103 use mz_ore::assert_ok;
1104 use mz_proto::protobuf_roundtrip;
1105
1106 use super::*;
1107
1108 proptest! {
1109 #![proptest_config(ProptestConfig::with_cases(10))]
1110 #[mz_ore::test]
1111 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1112 fn available_collections_protobuf_roundtrip(expect in any::<AvailableCollections>() ) {
1113 let actual = protobuf_roundtrip::<_, ProtoAvailableCollections>(&expect);
1114 assert_ok!(actual);
1115 assert_eq!(actual.unwrap(), expect);
1116 }
1117 }
1118
1119 proptest! {
1120 #![proptest_config(ProptestConfig::with_cases(10))]
1121 #[mz_ore::test]
1122 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1123 fn get_plan_protobuf_roundtrip(expect in any::<GetPlan>()) {
1124 let actual = protobuf_roundtrip::<_, ProtoGetPlan>(&expect);
1125 assert_ok!(actual);
1126 assert_eq!(actual.unwrap(), expect);
1127 }
1128 }
1129}