mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU64;
16
17use columnar::Columnar;
18use mz_expr::{
19 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
20 OptimizedMirRelationExpr, TableFunc,
21};
22use mz_ore::soft_assert_eq_no_log;
23use mz_ore::str::Indent;
24use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
25use mz_repr::explain::text::text_string_at;
26use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
27use mz_repr::optimize::OptimizerFeatures;
28use mz_repr::{ColumnType, Diff, GlobalId, Row};
29use proptest::arbitrary::Arbitrary;
30use proptest::prelude::*;
31use proptest::strategy::Strategy;
32use proptest_derive::Arbitrary;
33use serde::{Deserialize, Serialize};
34
35use crate::dataflows::DataflowDescription;
36use crate::plan::join::JoinPlan;
37use crate::plan::proto_available_collections::ProtoColumnTypes;
38use crate::plan::reduce::{KeyValPlan, ReducePlan};
39use crate::plan::threshold::ThresholdPlan;
40use crate::plan::top_k::TopKPlan;
41use crate::plan::transform::{Transform, TransformConfig};
42
43mod lowering;
44
45pub mod interpret;
46pub mod join;
47pub mod reduce;
48pub mod render_plan;
49pub mod threshold;
50pub mod top_k;
51pub mod transform;
52
53include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.rs"));
54
55/// The forms in which an operator's output is available.
56///
57/// These forms may include "raw", meaning as a streamed collection, but also any
58/// number of "arranged" representations.
59///
60/// Each arranged representation is described by a `KeyValRowMapping`, or rather
61/// at the moment by its three fields in a triple. These fields explain how to form
62/// a "key" by applying some expressions to each row, how to select "values" from
63/// columns not explicitly captured by the key, and how to return to the original
64/// row from the concatenation of key and value. Further explanation is available
65/// in the documentation for `KeyValRowMapping`.
66#[derive(
67 Arbitrary, Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize,
68)]
69pub struct AvailableCollections {
70 /// Whether the collection exists in unarranged form.
71 pub raw: bool,
72 /// The list of available arrangements, presented as a `KeyValRowMapping`,
73 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
74 /// The documentation for `KeyValRowMapping` explains these fields better.
75 #[proptest(strategy = "prop::collection::vec(any_arranged_thin(), 0..3)")]
76 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
77 /// The types of the columns in the raw form of the collection, if known. We
78 /// only capture types when necessary to support arrangement specialization,
79 /// so this only done for specific LIR operators during lowering.
80 pub types: Option<Vec<ColumnType>>,
81}
82
83/// A strategy that produces arrangements that are thinner than the default. That is
84/// the number of direct children is limited to a maximum of 3.
85pub(crate) fn any_arranged_thin()
86-> impl Strategy<Value = (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
87 (
88 prop::collection::vec(MirScalarExpr::arbitrary(), 0..3),
89 Vec::<usize>::arbitrary(),
90 Vec::<usize>::arbitrary(),
91 )
92}
93
94impl RustType<ProtoColumnTypes> for Vec<ColumnType> {
95 fn into_proto(&self) -> ProtoColumnTypes {
96 ProtoColumnTypes {
97 types: self.into_proto(),
98 }
99 }
100
101 fn from_proto(proto: ProtoColumnTypes) -> Result<Self, TryFromProtoError> {
102 proto.types.into_rust()
103 }
104}
105
106impl RustType<ProtoAvailableCollections> for AvailableCollections {
107 fn into_proto(&self) -> ProtoAvailableCollections {
108 ProtoAvailableCollections {
109 raw: self.raw,
110 arranged: self.arranged.into_proto(),
111 types: self.types.into_proto(),
112 }
113 }
114
115 fn from_proto(x: ProtoAvailableCollections) -> Result<Self, TryFromProtoError> {
116 Ok({
117 Self {
118 raw: x.raw,
119 arranged: x.arranged.into_rust()?,
120 types: x.types.into_rust()?,
121 }
122 })
123 }
124}
125
126impl AvailableCollections {
127 /// Represent a collection that has no arrangements.
128 pub fn new_raw() -> Self {
129 Self {
130 raw: true,
131 arranged: Vec::new(),
132 types: None,
133 }
134 }
135
136 /// Represent a collection that is arranged in the
137 /// specified ways, with optionally given types describing
138 /// the rows that would be in the raw form of the collection.
139 pub fn new_arranged(
140 arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
141 types: Option<Vec<ColumnType>>,
142 ) -> Self {
143 assert!(
144 !arranged.is_empty(),
145 "Invariant violated: at least one collection must exist"
146 );
147 Self {
148 raw: false,
149 arranged,
150 types,
151 }
152 }
153
154 /// Get some arrangement, if one exists.
155 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
156 assert!(
157 self.raw || !self.arranged.is_empty(),
158 "Invariant violated: at least one collection must exist"
159 );
160 self.arranged.get(0)
161 }
162}
163
164/// An identifier for an LIR node.
165#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
166pub struct LirId(u64);
167
168impl LirId {
169 fn as_u64(&self) -> u64 {
170 self.0
171 }
172}
173
174impl From<LirId> for u64 {
175 fn from(value: LirId) -> Self {
176 value.as_u64()
177 }
178}
179
180impl std::fmt::Display for LirId {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(f, "{}", self.0)
183 }
184}
185
186impl RustType<u64> for LirId {
187 fn into_proto(&self) -> u64 {
188 self.0
189 }
190
191 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
192 Ok(Self(proto))
193 }
194}
195
196/// A rendering plan with as much conditional logic as possible removed.
197#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
198pub struct Plan<T = mz_repr::Timestamp> {
199 /// A dataflow-local identifier.
200 pub lir_id: LirId,
201 /// The underlying operator.
202 pub node: PlanNode<T>,
203}
204
205/// The actual AST node of the `Plan`.
206#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
207pub enum PlanNode<T = mz_repr::Timestamp> {
208 /// A collection containing a pre-determined collection.
209 Constant {
210 /// Explicit update triples for the collection.
211 rows: Result<Vec<(Row, T, Diff)>, EvalError>,
212 },
213 /// A reference to a bound collection.
214 ///
215 /// This is commonly either an external reference to an existing source or
216 /// maintained arrangement, or an internal reference to a `Let` identifier.
217 Get {
218 /// A global or local identifier naming the collection.
219 id: Id,
220 /// Arrangements that will be available.
221 ///
222 /// The collection will also be loaded if available, which it will
223 /// not be for imported data, but which it may be for locally defined
224 /// data.
225 // TODO: Be more explicit about whether a collection is available,
226 // although one can always produce it from an arrangement, and it
227 // seems generally advantageous to do that instead (to avoid cloning
228 // rows, by using `mfp` first on borrowed data).
229 keys: AvailableCollections,
230 /// The actions to take when introducing the collection.
231 plan: GetPlan,
232 },
233 /// Binds `value` to `id`, and then results in `body` with that binding.
234 ///
235 /// This stage has the effect of sharing `value` across multiple possible
236 /// uses in `body`, and is the only mechanism we have for sharing collection
237 /// information across parts of a dataflow.
238 ///
239 /// The binding is not available outside of `body`.
240 Let {
241 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
242 id: LocalId,
243 /// The collection that should be bound to `id`.
244 value: Box<Plan<T>>,
245 /// The collection that results, which is allowed to contain `Get` stages
246 /// that reference `Id::Local(id)`.
247 body: Box<Plan<T>>,
248 },
249 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
250 ///
251 /// All bindings are available to all bindings, and to `body`.
252 /// The contents of each binding are initially empty, and then updated through a sequence
253 /// of iterations in which each binding is updated in sequence, from the most recent values
254 /// of all bindings.
255 LetRec {
256 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
257 ids: Vec<LocalId>,
258 /// The collection that should be bound to `id`.
259 values: Vec<Plan<T>>,
260 /// Maximum number of iterations. See further info on the MIR `LetRec`.
261 limits: Vec<Option<LetRecLimit>>,
262 /// The collection that results, which is allowed to contain `Get` stages
263 /// that reference `Id::Local(id)`.
264 body: Box<Plan<T>>,
265 },
266 /// Map, Filter, and Project operators.
267 ///
268 /// This stage contains work that we would ideally like to fuse to other plan
269 /// stages, but for practical reasons cannot. For example: threshold, topk,
270 /// and sometimes reduce stages are not able to absorb this operator.
271 Mfp {
272 /// The input collection.
273 input: Box<Plan<T>>,
274 /// Linear operator to apply to each record.
275 mfp: MapFilterProject,
276 /// Whether the input is from an arrangement, and if so,
277 /// whether we can seek to a specific value therein
278 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
279 },
280 /// A variable number of output records for each input record.
281 ///
282 /// This stage is a bit of a catch-all for logic that does not easily fit in
283 /// map stages. This includes table valued functions, but also functions of
284 /// multiple arguments, and functions that modify the sign of updates.
285 ///
286 /// This stage allows a `MapFilterProject` operator to be fused to its output,
287 /// and this can be very important as otherwise the output of `func` is just
288 /// appended to the input record, for as many outputs as it has. This has the
289 /// unpleasant default behavior of repeating potentially large records that
290 /// are being unpacked, producing quadratic output in those cases. Instead,
291 /// in these cases use a `mfp` member that projects away these large fields.
292 FlatMap {
293 /// The input collection.
294 input: Box<Plan<T>>,
295 /// The variable-record emitting function.
296 func: TableFunc,
297 /// Expressions that for each row prepare the arguments to `func`.
298 exprs: Vec<MirScalarExpr>,
299 /// Linear operator to apply to each record produced by `func`.
300 mfp_after: MapFilterProject,
301 /// The particular arrangement of the input we expect to use,
302 /// if any
303 input_key: Option<Vec<MirScalarExpr>>,
304 },
305 /// A multiway relational equijoin, with fused map, filter, and projection.
306 ///
307 /// This stage performs a multiway join among `inputs`, using the equality
308 /// constraints expressed in `plan`. The plan also describes the implementation
309 /// strategy we will use, and any pushed down per-record work.
310 Join {
311 /// An ordered list of inputs that will be joined.
312 inputs: Vec<Plan<T>>,
313 /// Detailed information about the implementation of the join.
314 ///
315 /// This includes information about the implementation strategy, but also
316 /// any map, filter, project work that we might follow the join with, but
317 /// potentially pushed down into the implementation of the join.
318 plan: JoinPlan,
319 },
320 /// Aggregation by key.
321 Reduce {
322 /// The input collection.
323 input: Box<Plan<T>>,
324 /// A plan for changing input records into key, value pairs.
325 key_val_plan: KeyValPlan,
326 /// A plan for performing the reduce.
327 ///
328 /// The implementation of reduction has several different strategies based
329 /// on the properties of the reduction, and the input itself. Please check
330 /// out the documentation for this type for more detail.
331 plan: ReducePlan,
332 /// The particular arrangement of the input we expect to use,
333 /// if any
334 input_key: Option<Vec<MirScalarExpr>>,
335 /// An MFP that must be applied to results. The projection part of this
336 /// MFP must preserve the key for the reduction; otherwise, the results
337 /// become undefined. Additionally, the MFP must be free from temporal
338 /// predicates so that it can be readily evaluated.
339 mfp_after: MapFilterProject,
340 },
341 /// Key-based "Top K" operator, retaining the first K records in each group.
342 TopK {
343 /// The input collection.
344 input: Box<Plan<T>>,
345 /// A plan for performing the Top-K.
346 ///
347 /// The implementation of reduction has several different strategies based
348 /// on the properties of the reduction, and the input itself. Please check
349 /// out the documentation for this type for more detail.
350 top_k_plan: TopKPlan,
351 },
352 /// Inverts the sign of each update.
353 Negate {
354 /// The input collection.
355 input: Box<Plan<T>>,
356 },
357 /// Filters records that accumulate negatively.
358 ///
359 /// Although the operator suppresses updates, it is a stateful operator taking
360 /// resources proportional to the number of records with non-zero accumulation.
361 Threshold {
362 /// The input collection.
363 input: Box<Plan<T>>,
364 /// A plan for performing the threshold.
365 ///
366 /// The implementation of reduction has several different strategies based
367 /// on the properties of the reduction, and the input itself. Please check
368 /// out the documentation for this type for more detail.
369 threshold_plan: ThresholdPlan,
370 },
371 /// Adds the contents of the input collections.
372 ///
373 /// Importantly, this is *multiset* union, so the multiplicities of records will
374 /// add. This is in contrast to *set* union, where the multiplicities would be
375 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
376 /// implementing the "distinct" operator.
377 Union {
378 /// The input collections
379 inputs: Vec<Plan<T>>,
380 /// Whether to consolidate the output, e.g., cancel negated records.
381 consolidate_output: bool,
382 },
383 /// The `input` plan, but with additional arrangements.
384 ///
385 /// This operator does not change the logical contents of `input`, but ensures
386 /// that certain arrangements are available in the results. This operator can
387 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
388 /// or to cap a `Plan` so that indexes can be exported.
389 ArrangeBy {
390 /// The input collection.
391 input: Box<Plan<T>>,
392 /// A list of arrangement keys, and possibly a raw collection,
393 /// that will be added to those of the input.
394 ///
395 /// If any of these collection forms are already present in the input, they have no effect.
396 forms: AvailableCollections,
397 /// The key that must be used to access the input.
398 input_key: Option<Vec<MirScalarExpr>>,
399 /// The MFP that must be applied to the input.
400 input_mfp: MapFilterProject,
401 },
402}
403
404impl<T> PlanNode<T> {
405 /// Iterates through references to child expressions.
406 pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
407 let mut first = None;
408 let mut second = None;
409 let mut rest = None;
410 let mut last = None;
411
412 use PlanNode::*;
413 match self {
414 Constant { .. } | Get { .. } => (),
415 Let { value, body, .. } => {
416 first = Some(&**value);
417 second = Some(&**body);
418 }
419 LetRec { values, body, .. } => {
420 rest = Some(values);
421 last = Some(&**body);
422 }
423 Mfp { input, .. }
424 | FlatMap { input, .. }
425 | Reduce { input, .. }
426 | TopK { input, .. }
427 | Negate { input, .. }
428 | Threshold { input, .. }
429 | ArrangeBy { input, .. } => {
430 first = Some(&**input);
431 }
432 Join { inputs, .. } | Union { inputs, .. } => {
433 rest = Some(inputs);
434 }
435 }
436
437 first
438 .into_iter()
439 .chain(second)
440 .chain(rest.into_iter().flatten())
441 .chain(last)
442 }
443
444 /// Iterates through mutable references to child expressions.
445 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
446 let mut first = None;
447 let mut second = None;
448 let mut rest = None;
449 let mut last = None;
450
451 use PlanNode::*;
452 match self {
453 Constant { .. } | Get { .. } => (),
454 Let { value, body, .. } => {
455 first = Some(&mut **value);
456 second = Some(&mut **body);
457 }
458 LetRec { values, body, .. } => {
459 rest = Some(values);
460 last = Some(&mut **body);
461 }
462 Mfp { input, .. }
463 | FlatMap { input, .. }
464 | Reduce { input, .. }
465 | TopK { input, .. }
466 | Negate { input, .. }
467 | Threshold { input, .. }
468 | ArrangeBy { input, .. } => {
469 first = Some(&mut **input);
470 }
471 Join { inputs, .. } | Union { inputs, .. } => {
472 rest = Some(inputs);
473 }
474 }
475
476 first
477 .into_iter()
478 .chain(second)
479 .chain(rest.into_iter().flatten())
480 .chain(last)
481 }
482}
483
484impl<T> PlanNode<T> {
485 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
486 pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
487 Plan { lir_id, node: self }
488 }
489}
490
491impl Plan {
492 /// Pretty-print this [Plan] to a string.
493 pub fn pretty(&self) -> String {
494 let config = ExplainConfig::default();
495 self.explain(&config, None)
496 }
497
498 /// Pretty-print this [Plan] to a string using a custom
499 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
500 pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
501 text_string_at(self, || PlanRenderingContext {
502 indent: Indent::default(),
503 humanizer: humanizer.unwrap_or(&DummyHumanizer),
504 annotations: BTreeMap::default(),
505 config,
506 })
507 }
508}
509
510impl Arbitrary for LirId {
511 type Strategy = BoxedStrategy<LirId>;
512 type Parameters = ();
513
514 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
515 let lir_id = u64::arbitrary();
516 lir_id.prop_map(LirId).boxed()
517 }
518}
519
520impl Arbitrary for Plan {
521 type Strategy = BoxedStrategy<Plan>;
522 type Parameters = ();
523
524 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
525 let row_diff = prop::collection::vec(
526 (
527 Row::arbitrary_with((1..5).into()),
528 mz_repr::Timestamp::arbitrary(),
529 Diff::arbitrary(),
530 ),
531 0..2,
532 );
533 let rows = prop::result::maybe_ok(row_diff, EvalError::arbitrary());
534 let constant = (rows, any::<LirId>()).prop_map(|(rows, lir_id)| {
535 PlanNode::<mz_repr::Timestamp>::Constant { rows }.as_plan(lir_id)
536 });
537
538 let get = (
539 any::<GlobalId>(),
540 any::<AvailableCollections>(),
541 any::<GetPlan>(),
542 any::<LirId>(),
543 )
544 .prop_map(|(id, keys, plan, lir_id)| {
545 PlanNode::<mz_repr::Timestamp>::Get {
546 id: Id::Global(id),
547 keys,
548 plan,
549 }
550 .as_plan(lir_id)
551 });
552
553 let leaf = prop::strategy::Union::new(vec![constant.boxed(), get.boxed()]).boxed();
554
555 leaf.prop_recursive(2, 4, 5, |inner| {
556 prop::strategy::Union::new(vec![
557 //Plan::Let
558 (
559 any::<LocalId>(),
560 inner.clone(),
561 inner.clone(),
562 any::<LirId>(),
563 )
564 .prop_map(|(id, value, body, lir_id)| {
565 PlanNode::<mz_repr::Timestamp>::Let {
566 id,
567 value: value.into(),
568 body: body.into(),
569 }
570 .as_plan(lir_id)
571 })
572 .boxed(),
573 //Plan::Mfp
574 (
575 inner.clone(),
576 any::<MapFilterProject>(),
577 any::<Option<(Vec<MirScalarExpr>, Option<Row>)>>(),
578 any::<LirId>(),
579 )
580 .prop_map(|(input, mfp, input_key_val, lir_id)| {
581 PlanNode::Mfp {
582 input: input.into(),
583 mfp,
584 input_key_val,
585 }
586 .as_plan(lir_id)
587 })
588 .boxed(),
589 //Plan::FlatMap
590 (
591 inner.clone(),
592 any::<TableFunc>(),
593 any::<Vec<MirScalarExpr>>(),
594 any::<MapFilterProject>(),
595 any::<Option<Vec<MirScalarExpr>>>(),
596 any::<LirId>(),
597 )
598 .prop_map(|(input, func, exprs, mfp, input_key, lir_id)| {
599 PlanNode::FlatMap {
600 input: input.into(),
601 func,
602 exprs,
603 mfp_after: mfp,
604 input_key,
605 }
606 .as_plan(lir_id)
607 })
608 .boxed(),
609 //Plan::Join
610 (
611 prop::collection::vec(inner.clone(), 0..2),
612 any::<JoinPlan>(),
613 any::<LirId>(),
614 )
615 .prop_map(|(inputs, plan, lir_id)| {
616 PlanNode::Join { inputs, plan }.as_plan(lir_id)
617 })
618 .boxed(),
619 //Plan::Reduce
620 (
621 inner.clone(),
622 any::<KeyValPlan>(),
623 any::<ReducePlan>(),
624 any::<Option<Vec<MirScalarExpr>>>(),
625 any::<MapFilterProject>(),
626 any::<LirId>(),
627 )
628 .prop_map(
629 |(input, key_val_plan, plan, input_key, mfp_after, lir_id)| {
630 PlanNode::Reduce {
631 input: input.into(),
632 key_val_plan,
633 plan,
634 input_key,
635 mfp_after,
636 }
637 .as_plan(lir_id)
638 },
639 )
640 .boxed(),
641 //Plan::TopK
642 (inner.clone(), any::<TopKPlan>(), any::<LirId>())
643 .prop_map(|(input, top_k_plan, lir_id)| {
644 PlanNode::TopK {
645 input: input.into(),
646 top_k_plan,
647 }
648 .as_plan(lir_id)
649 })
650 .boxed(),
651 //Plan::Negate
652 (inner.clone(), any::<LirId>())
653 .prop_map(|(x, lir_id)| PlanNode::Negate { input: x.into() }.as_plan(lir_id))
654 .boxed(),
655 //Plan::Threshold
656 (inner.clone(), any::<ThresholdPlan>(), any::<LirId>())
657 .prop_map(|(input, threshold_plan, lir_id)| {
658 PlanNode::Threshold {
659 input: input.into(),
660 threshold_plan,
661 }
662 .as_plan(lir_id)
663 })
664 .boxed(),
665 // Plan::Union
666 (
667 prop::collection::vec(inner.clone(), 0..2),
668 any::<bool>(),
669 any::<LirId>(),
670 )
671 .prop_map(|(x, b, lir_id)| {
672 PlanNode::Union {
673 inputs: x,
674 consolidate_output: b,
675 }
676 .as_plan(lir_id)
677 })
678 .boxed(),
679 //Plan::ArrangeBy
680 (
681 inner,
682 any::<AvailableCollections>(),
683 any::<Option<Vec<MirScalarExpr>>>(),
684 any::<MapFilterProject>(),
685 any::<LirId>(),
686 )
687 .prop_map(|(input, forms, input_key, input_mfp, lir_id)| {
688 PlanNode::ArrangeBy {
689 input: input.into(),
690 forms,
691 input_key,
692 input_mfp,
693 }
694 .as_plan(lir_id)
695 })
696 .boxed(),
697 ])
698 })
699 .boxed()
700 }
701}
702
703/// How a `Get` stage will be rendered.
704#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
705pub enum GetPlan {
706 /// Simply pass input arrangements on to the next stage.
707 PassArrangements,
708 /// Using the supplied key, optionally seek the row, and apply the MFP.
709 Arrangement(
710 #[proptest(strategy = "prop::collection::vec(MirScalarExpr::arbitrary(), 0..3)")]
711 Vec<MirScalarExpr>,
712 Option<Row>,
713 MapFilterProject,
714 ),
715 /// Scan the input collection (unarranged) and apply the MFP.
716 Collection(MapFilterProject),
717}
718
719impl RustType<ProtoGetPlan> for GetPlan {
720 fn into_proto(&self) -> ProtoGetPlan {
721 use proto_get_plan::Kind::*;
722
723 ProtoGetPlan {
724 kind: Some(match self {
725 GetPlan::PassArrangements => PassArrangements(()),
726 GetPlan::Arrangement(k, s, m) => {
727 Arrangement(proto_get_plan::ProtoGetPlanArrangement {
728 key: k.into_proto(),
729 seek: s.into_proto(),
730 mfp: Some(m.into_proto()),
731 })
732 }
733 GetPlan::Collection(mfp) => Collection(mfp.into_proto()),
734 }),
735 }
736 }
737
738 fn from_proto(proto: ProtoGetPlan) -> Result<Self, TryFromProtoError> {
739 use proto_get_plan::Kind::*;
740 use proto_get_plan::ProtoGetPlanArrangement;
741 match proto.kind {
742 Some(PassArrangements(())) => Ok(GetPlan::PassArrangements),
743 Some(Arrangement(ProtoGetPlanArrangement { key, seek, mfp })) => {
744 Ok(GetPlan::Arrangement(
745 key.into_rust()?,
746 seek.into_rust()?,
747 mfp.into_rust_if_some("ProtoGetPlanArrangement::mfp")?,
748 ))
749 }
750 Some(Collection(mfp)) => Ok(GetPlan::Collection(mfp.into_rust()?)),
751 None => Err(TryFromProtoError::missing_field("ProtoGetPlan::kind")),
752 }
753 }
754}
755
756impl RustType<ProtoLetRecLimit> for LetRecLimit {
757 fn into_proto(&self) -> ProtoLetRecLimit {
758 ProtoLetRecLimit {
759 max_iters: self.max_iters.get(),
760 return_at_limit: self.return_at_limit,
761 }
762 }
763
764 fn from_proto(proto: ProtoLetRecLimit) -> Result<Self, TryFromProtoError> {
765 Ok(LetRecLimit {
766 max_iters: NonZeroU64::new(proto.max_iters).expect("max_iters > 0"),
767 return_at_limit: proto.return_at_limit,
768 })
769 }
770}
771
772impl<T: timely::progress::Timestamp> Plan<T> {
773 /// Convert the dataflow description into one that uses render plans.
774 #[mz_ore::instrument(
775 target = "optimizer",
776 level = "debug",
777 fields(path.segment = "finalize_dataflow")
778 )]
779 pub fn finalize_dataflow(
780 desc: DataflowDescription<OptimizedMirRelationExpr>,
781 features: &OptimizerFeatures,
782 ) -> Result<DataflowDescription<Self>, String> {
783 // First, we lower the dataflow description from MIR to LIR.
784 let mut dataflow = Self::lower_dataflow(desc, features)?;
785
786 // Subsequently, we perform plan refinements for the dataflow.
787 Self::refine_source_mfps(&mut dataflow);
788
789 if features.enable_consolidate_after_union_negate {
790 Self::refine_union_negate_consolidation(&mut dataflow);
791 }
792
793 if dataflow.is_single_time() {
794 Self::refine_single_time_operator_selection(&mut dataflow);
795
796 // The relaxation of the `must_consolidate` flag performs an LIR-based
797 // analysis and transform under checked recursion. By a similar argument
798 // made in `from_mir`, we do not expect the recursion limit to be hit.
799 // However, if that happens, we propagate an error to the caller.
800 // To apply the transform, we first obtain monotonic source and index
801 // global IDs and add them to a `TransformConfig` instance.
802 let monotonic_ids = dataflow
803 .source_imports
804 .iter()
805 .filter_map(|(id, (_, monotonic, _upper))| if *monotonic { Some(id) } else { None })
806 .chain(
807 dataflow
808 .index_imports
809 .iter()
810 .filter_map(|(id, index_import)| {
811 if index_import.monotonic {
812 Some(id)
813 } else {
814 None
815 }
816 }),
817 )
818 .cloned()
819 .collect::<BTreeSet<_>>();
820
821 let config = TransformConfig { monotonic_ids };
822 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
823 }
824
825 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
826
827 mz_repr::explain::trace_plan(&dataflow);
828
829 Ok(dataflow)
830 }
831
832 /// Lowers the dataflow description from MIR to LIR. To this end, the
833 /// method collects all available arrangements and based on this information
834 /// creates plans for every object to be built for the dataflow.
835 #[mz_ore::instrument(
836 target = "optimizer",
837 level = "debug",
838 fields(path.segment ="mir_to_lir")
839 )]
840 fn lower_dataflow(
841 desc: DataflowDescription<OptimizedMirRelationExpr>,
842 features: &OptimizerFeatures,
843 ) -> Result<DataflowDescription<Self>, String> {
844 let context = lowering::Context::new(desc.debug_name.clone(), features);
845 let dataflow = context.lower(desc)?;
846
847 mz_repr::explain::trace_plan(&dataflow);
848
849 Ok(dataflow)
850 }
851
852 /// Refines the source instance descriptions for sources imported by `dataflow` to
853 /// push down common MFP expressions.
854 #[mz_ore::instrument(
855 target = "optimizer",
856 level = "debug",
857 fields(path.segment = "refine_source_mfps")
858 )]
859 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
860 // Extract MFPs from Get operators for sources, and extract what we can for the source.
861 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
862 for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
863 let mut identity_present = false;
864 let mut mfps = Vec::new();
865 for build_desc in dataflow.objects_to_build.iter_mut() {
866 let mut todo = vec![&mut build_desc.plan];
867 while let Some(expression) = todo.pop() {
868 let node = &mut expression.node;
869 if let PlanNode::Get { id, plan, .. } = node {
870 if *id == mz_expr::Id::Global(*source_id) {
871 match plan {
872 GetPlan::Collection(mfp) => mfps.push(mfp),
873 GetPlan::PassArrangements => {
874 identity_present = true;
875 }
876 GetPlan::Arrangement(..) => {
877 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
878 }
879 }
880 }
881 } else {
882 todo.extend(node.children_mut());
883 }
884 }
885 }
886
887 // Direct exports of sources are possible, and prevent pushdown.
888 identity_present |= dataflow
889 .index_exports
890 .values()
891 .any(|(x, _)| x.on_id == *source_id);
892 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
893
894 if !identity_present && !mfps.is_empty() {
895 // Extract a common prefix `MapFilterProject` from `mfps`.
896 let common = MapFilterProject::extract_common(&mut mfps[..]);
897 // Apply common expressions to the source's `MapFilterProject`.
898 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
899 MapFilterProject::compose(mfp, common)
900 } else {
901 common
902 };
903 mfp.optimize();
904 source.arguments.operators = Some(mfp);
905 }
906 }
907 mz_repr::explain::trace_plan(dataflow);
908 }
909
910 /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
911 #[mz_ore::instrument(
912 target = "optimizer",
913 level = "debug",
914 fields(path.segment = "refine_union_negate_consolidation")
915 )]
916 fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
917 for build_desc in dataflow.objects_to_build.iter_mut() {
918 let mut todo = vec![&mut build_desc.plan];
919 while let Some(expression) = todo.pop() {
920 let node = &mut expression.node;
921 match node {
922 PlanNode::Union {
923 inputs,
924 consolidate_output,
925 ..
926 } => {
927 if inputs
928 .iter()
929 .any(|input| matches!(input.node, PlanNode::Negate { .. }))
930 {
931 *consolidate_output = true;
932 }
933 }
934 _ => {}
935 }
936 todo.extend(node.children_mut());
937 }
938 }
939 mz_repr::explain::trace_plan(dataflow);
940 }
941
942 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
943 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
944 /// one-shot SELECT query.
945 #[mz_ore::instrument(
946 target = "optimizer",
947 level = "debug",
948 fields(path.segment = "refine_single_time_operator_selection")
949 )]
950 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
951 // We should only reach here if we have a one-shot SELECT query, i.e.,
952 // a single-time dataflow.
953 assert!(dataflow.is_single_time());
954
955 // Upgrade single-time plans to monotonic.
956 for build_desc in dataflow.objects_to_build.iter_mut() {
957 let mut todo = vec![&mut build_desc.plan];
958 while let Some(expression) = todo.pop() {
959 let node = &mut expression.node;
960 match node {
961 PlanNode::Reduce { plan, .. } => {
962 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
963 match plan {
964 ReducePlan::Collation(collation) => {
965 collation.as_monotonic(true);
966 }
967 ReducePlan::Hierarchical(hierarchical) => {
968 hierarchical.as_monotonic(true);
969 }
970 _ => {
971 // Nothing to do for other plans, and doing nothing is safe for future variants.
972 }
973 }
974 todo.extend(node.children_mut());
975 }
976 PlanNode::TopK { top_k_plan, .. } => {
977 top_k_plan.as_monotonic(true);
978 todo.extend(node.children_mut());
979 }
980 PlanNode::LetRec { body, .. } => {
981 // Only the non-recursive `body` is restricted to a single time.
982 todo.push(body);
983 }
984 _ => {
985 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
986 todo.extend(node.children_mut());
987 }
988 }
989 }
990 }
991 mz_repr::explain::trace_plan(dataflow);
992 }
993
994 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
995 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
996 /// whenever the input is deemed to be physically monotonic.
997 #[mz_ore::instrument(
998 target = "optimizer",
999 level = "debug",
1000 fields(path.segment = "refine_single_time_consolidation")
1001 )]
1002 fn refine_single_time_consolidation(
1003 dataflow: &mut DataflowDescription<Self>,
1004 config: &TransformConfig,
1005 ) -> Result<(), String> {
1006 // We should only reach here if we have a one-shot SELECT query, i.e.,
1007 // a single-time dataflow.
1008 assert!(dataflow.is_single_time());
1009
1010 let transform = transform::RelaxMustConsolidate::<T>::new();
1011 for build_desc in dataflow.objects_to_build.iter_mut() {
1012 transform
1013 .transform(config, &mut build_desc.plan)
1014 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
1015 }
1016 mz_repr::explain::trace_plan(dataflow);
1017 Ok(())
1018 }
1019}
1020
1021impl<T> CollectionPlan for PlanNode<T> {
1022 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1023 match self {
1024 PlanNode::Constant { rows: _ } => (),
1025 PlanNode::Get {
1026 id,
1027 keys: _,
1028 plan: _,
1029 } => match id {
1030 Id::Global(id) => {
1031 out.insert(*id);
1032 }
1033 Id::Local(_) => (),
1034 },
1035 PlanNode::Let { id: _, value, body } => {
1036 value.depends_on_into(out);
1037 body.depends_on_into(out);
1038 }
1039 PlanNode::LetRec {
1040 ids: _,
1041 values,
1042 limits: _,
1043 body,
1044 } => {
1045 for value in values.iter() {
1046 value.depends_on_into(out);
1047 }
1048 body.depends_on_into(out);
1049 }
1050 PlanNode::Join { inputs, plan: _ }
1051 | PlanNode::Union {
1052 inputs,
1053 consolidate_output: _,
1054 } => {
1055 for input in inputs {
1056 input.depends_on_into(out);
1057 }
1058 }
1059 PlanNode::Mfp {
1060 input,
1061 mfp: _,
1062 input_key_val: _,
1063 }
1064 | PlanNode::FlatMap {
1065 input,
1066 func: _,
1067 exprs: _,
1068 mfp_after: _,
1069 input_key: _,
1070 }
1071 | PlanNode::ArrangeBy {
1072 input,
1073 forms: _,
1074 input_key: _,
1075 input_mfp: _,
1076 }
1077 | PlanNode::Reduce {
1078 input,
1079 key_val_plan: _,
1080 plan: _,
1081 input_key: _,
1082 mfp_after: _,
1083 }
1084 | PlanNode::TopK {
1085 input,
1086 top_k_plan: _,
1087 }
1088 | PlanNode::Negate { input }
1089 | PlanNode::Threshold {
1090 input,
1091 threshold_plan: _,
1092 } => {
1093 input.depends_on_into(out);
1094 }
1095 }
1096 }
1097}
1098
1099impl<T> CollectionPlan for Plan<T> {
1100 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1101 self.node.depends_on_into(out);
1102 }
1103}
1104
1105/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
1106/// on the expected number of rows that will have the same group key.
1107fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
1108 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
1109 // mz_introspection.mz_expected_group_size_advice.
1110 let mut buckets = vec![];
1111 let mut current = 16;
1112
1113 // Plan for 4B records in the expected case if the user didn't specify a group size.
1114 let limit = expected_group_size.unwrap_or(4_000_000_000);
1115
1116 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
1117 // each layer gets from the preceding layer, while also limiting the number of layers.
1118 while current < limit {
1119 buckets.push(current);
1120 current = current.saturating_mul(16);
1121 }
1122
1123 buckets.reverse();
1124 buckets
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use mz_ore::assert_ok;
1130 use mz_proto::protobuf_roundtrip;
1131
1132 use super::*;
1133
1134 proptest! {
1135 #![proptest_config(ProptestConfig::with_cases(10))]
1136 #[mz_ore::test]
1137 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1138 fn available_collections_protobuf_roundtrip(expect in any::<AvailableCollections>() ) {
1139 let actual = protobuf_roundtrip::<_, ProtoAvailableCollections>(&expect);
1140 assert_ok!(actual);
1141 assert_eq!(actual.unwrap(), expect);
1142 }
1143 }
1144
1145 proptest! {
1146 #![proptest_config(ProptestConfig::with_cases(10))]
1147 #[mz_ore::test]
1148 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1149 fn get_plan_protobuf_roundtrip(expect in any::<GetPlan>()) {
1150 let actual = protobuf_roundtrip::<_, ProtoGetPlan>(&expect);
1151 assert_ok!(actual);
1152 assert_eq!(actual.unwrap(), expect);
1153 }
1154 }
1155}