mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58 Clone,
59 Debug,
60 Default,
61 Deserialize,
62 Eq,
63 Ord,
64 PartialEq,
65 PartialOrd,
66 Serialize
67)]
68pub struct AvailableCollections {
69 /// Whether the collection exists in unarranged form.
70 pub raw: bool,
71 /// The list of available arrangements, presented as a `KeyValRowMapping`,
72 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73 /// The documentation for `KeyValRowMapping` explains these fields better.
74 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78 /// Represent a collection that has no arrangements.
79 pub fn new_raw() -> Self {
80 Self {
81 raw: true,
82 arranged: Vec::new(),
83 }
84 }
85
86 /// Represent a collection that is arranged in the specified ways.
87 pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88 assert!(
89 !arranged.is_empty(),
90 "Invariant violated: at least one collection must exist"
91 );
92 Self {
93 raw: false,
94 arranged,
95 }
96 }
97
98 /// Get some arrangement, if one exists.
99 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100 assert!(
101 self.raw || !self.arranged.is_empty(),
102 "Invariant violated: at least one collection must exist"
103 );
104 self.arranged.get(0)
105 }
106}
107
108/// An identifier for an LIR node.
109#[derive(
110 Clone,
111 Copy,
112 Debug,
113 Deserialize,
114 Eq,
115 Ord,
116 PartialEq,
117 PartialOrd,
118 Serialize,
119 Columnar
120)]
121pub struct LirId(u64);
122
123impl LirId {
124 fn as_u64(&self) -> u64 {
125 self.0
126 }
127}
128
129impl From<LirId> for u64 {
130 fn from(value: LirId) -> Self {
131 value.as_u64()
132 }
133}
134
135impl std::fmt::Display for LirId {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "{}", self.0)
138 }
139}
140
141/// A rendering plan with as much conditional logic as possible removed.
142#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
143pub struct Plan<T = mz_repr::Timestamp> {
144 /// A dataflow-local identifier.
145 pub lir_id: LirId,
146 /// The underlying operator.
147 pub node: PlanNode<T>,
148}
149
150/// The actual AST node of the `Plan`.
151#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
152pub enum PlanNode<T = mz_repr::Timestamp> {
153 /// A collection containing a pre-determined collection.
154 Constant {
155 /// Explicit update triples for the collection.
156 rows: Result<Vec<(Row, T, Diff)>, EvalError>,
157 },
158 /// A reference to a bound collection.
159 ///
160 /// This is commonly either an external reference to an existing source or
161 /// maintained arrangement, or an internal reference to a `Let` identifier.
162 Get {
163 /// A global or local identifier naming the collection.
164 id: Id,
165 /// Arrangements that will be available.
166 ///
167 /// The collection will also be loaded if available, which it will
168 /// not be for imported data, but which it may be for locally defined
169 /// data.
170 // TODO: Be more explicit about whether a collection is available,
171 // although one can always produce it from an arrangement, and it
172 // seems generally advantageous to do that instead (to avoid cloning
173 // rows, by using `mfp` first on borrowed data).
174 keys: AvailableCollections,
175 /// The actions to take when introducing the collection.
176 plan: GetPlan,
177 },
178 /// Binds `value` to `id`, and then results in `body` with that binding.
179 ///
180 /// This stage has the effect of sharing `value` across multiple possible
181 /// uses in `body`, and is the only mechanism we have for sharing collection
182 /// information across parts of a dataflow.
183 ///
184 /// The binding is not available outside of `body`.
185 Let {
186 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
187 id: LocalId,
188 /// The collection that should be bound to `id`.
189 value: Box<Plan<T>>,
190 /// The collection that results, which is allowed to contain `Get` stages
191 /// that reference `Id::Local(id)`.
192 body: Box<Plan<T>>,
193 },
194 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
195 ///
196 /// All bindings are available to all bindings, and to `body`.
197 /// The contents of each binding are initially empty, and then updated through a sequence
198 /// of iterations in which each binding is updated in sequence, from the most recent values
199 /// of all bindings.
200 LetRec {
201 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
202 ids: Vec<LocalId>,
203 /// The collection that should be bound to `id`.
204 values: Vec<Plan<T>>,
205 /// Maximum number of iterations. See further info on the MIR `LetRec`.
206 limits: Vec<Option<LetRecLimit>>,
207 /// The collection that results, which is allowed to contain `Get` stages
208 /// that reference `Id::Local(id)`.
209 body: Box<Plan<T>>,
210 },
211 /// Map, Filter, and Project operators.
212 ///
213 /// This stage contains work that we would ideally like to fuse to other plan
214 /// stages, but for practical reasons cannot. For example: threshold, topk,
215 /// and sometimes reduce stages are not able to absorb this operator.
216 Mfp {
217 /// The input collection.
218 input: Box<Plan<T>>,
219 /// Linear operator to apply to each record.
220 mfp: MapFilterProject,
221 /// Whether the input is from an arrangement, and if so,
222 /// whether we can seek to a specific value therein
223 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
224 },
225 /// A variable number of output records for each input record.
226 ///
227 /// This stage is a bit of a catch-all for logic that does not easily fit in
228 /// map stages. This includes table valued functions, but also functions of
229 /// multiple arguments, and functions that modify the sign of updates.
230 ///
231 /// This stage allows a `MapFilterProject` operator to be fused to its output,
232 /// and this can be very important as otherwise the output of `func` is just
233 /// appended to the input record, for as many outputs as it has. This has the
234 /// unpleasant default behavior of repeating potentially large records that
235 /// are being unpacked, producing quadratic output in those cases. Instead,
236 /// in these cases use a `mfp` member that projects away these large fields.
237 FlatMap {
238 /// The particular arrangement of the input we expect to use,
239 /// if any
240 input_key: Option<Vec<MirScalarExpr>>,
241 /// The input collection.
242 input: Box<Plan<T>>,
243 /// Expressions that for each row prepare the arguments to `func`.
244 exprs: Vec<MirScalarExpr>,
245 /// The variable-record emitting function.
246 func: TableFunc,
247 /// Linear operator to apply to each record produced by `func`.
248 mfp_after: MapFilterProject,
249 },
250 /// A multiway relational equijoin, with fused map, filter, and projection.
251 ///
252 /// This stage performs a multiway join among `inputs`, using the equality
253 /// constraints expressed in `plan`. The plan also describes the implementation
254 /// strategy we will use, and any pushed down per-record work.
255 Join {
256 /// An ordered list of inputs that will be joined.
257 inputs: Vec<Plan<T>>,
258 /// Detailed information about the implementation of the join.
259 ///
260 /// This includes information about the implementation strategy, but also
261 /// any map, filter, project work that we might follow the join with, but
262 /// potentially pushed down into the implementation of the join.
263 plan: JoinPlan,
264 },
265 /// Aggregation by key.
266 Reduce {
267 /// The particular arrangement of the input we expect to use,
268 /// if any
269 input_key: Option<Vec<MirScalarExpr>>,
270 /// The input collection.
271 input: Box<Plan<T>>,
272 /// A plan for changing input records into key, value pairs.
273 key_val_plan: KeyValPlan,
274 /// A plan for performing the reduce.
275 ///
276 /// The implementation of reduction has several different strategies based
277 /// on the properties of the reduction, and the input itself. Please check
278 /// out the documentation for this type for more detail.
279 plan: ReducePlan,
280 /// An MFP that must be applied to results. The projection part of this
281 /// MFP must preserve the key for the reduction; otherwise, the results
282 /// become undefined. Additionally, the MFP must be free from temporal
283 /// predicates so that it can be readily evaluated.
284 /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
285 mfp_after: MapFilterProject,
286 },
287 /// Key-based "Top K" operator, retaining the first K records in each group.
288 TopK {
289 /// The input collection.
290 input: Box<Plan<T>>,
291 /// A plan for performing the Top-K.
292 ///
293 /// The implementation of reduction has several different strategies based
294 /// on the properties of the reduction, and the input itself. Please check
295 /// out the documentation for this type for more detail.
296 top_k_plan: TopKPlan,
297 },
298 /// Inverts the sign of each update.
299 Negate {
300 /// The input collection.
301 input: Box<Plan<T>>,
302 },
303 /// Filters records that accumulate negatively.
304 ///
305 /// Although the operator suppresses updates, it is a stateful operator taking
306 /// resources proportional to the number of records with non-zero accumulation.
307 Threshold {
308 /// The input collection.
309 input: Box<Plan<T>>,
310 /// A plan for performing the threshold.
311 ///
312 /// The implementation of reduction has several different strategies based
313 /// on the properties of the reduction, and the input itself. Please check
314 /// out the documentation for this type for more detail.
315 threshold_plan: ThresholdPlan,
316 },
317 /// Adds the contents of the input collections.
318 ///
319 /// Importantly, this is *multiset* union, so the multiplicities of records will
320 /// add. This is in contrast to *set* union, where the multiplicities would be
321 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
322 /// implementing the "distinct" operator.
323 Union {
324 /// The input collections
325 inputs: Vec<Plan<T>>,
326 /// Whether to consolidate the output, e.g., cancel negated records.
327 consolidate_output: bool,
328 },
329 /// The `input` plan, but with additional arrangements.
330 ///
331 /// This operator does not change the logical contents of `input`, but ensures
332 /// that certain arrangements are available in the results. This operator can
333 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
334 /// or to cap a `Plan` so that indexes can be exported.
335 ArrangeBy {
336 /// The key that must be used to access the input.
337 input_key: Option<Vec<MirScalarExpr>>,
338 /// The input collection.
339 input: Box<Plan<T>>,
340 /// The MFP that must be applied to the input.
341 input_mfp: MapFilterProject,
342 /// A list of arrangement keys, and possibly a raw collection,
343 /// that will be added to those of the input. Does not include
344 /// any other existing arrangements.
345 forms: AvailableCollections,
346 },
347}
348
349impl<T> PlanNode<T> {
350 /// Iterates through references to child expressions.
351 pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
352 let mut first = None;
353 let mut second = None;
354 let mut rest = None;
355 let mut last = None;
356
357 use PlanNode::*;
358 match self {
359 Constant { .. } | Get { .. } => (),
360 Let { value, body, .. } => {
361 first = Some(&**value);
362 second = Some(&**body);
363 }
364 LetRec { values, body, .. } => {
365 rest = Some(values);
366 last = Some(&**body);
367 }
368 Mfp { input, .. }
369 | FlatMap { input, .. }
370 | Reduce { input, .. }
371 | TopK { input, .. }
372 | Negate { input, .. }
373 | Threshold { input, .. }
374 | ArrangeBy { input, .. } => {
375 first = Some(&**input);
376 }
377 Join { inputs, .. } | Union { inputs, .. } => {
378 rest = Some(inputs);
379 }
380 }
381
382 first
383 .into_iter()
384 .chain(second)
385 .chain(rest.into_iter().flatten())
386 .chain(last)
387 }
388
389 /// Iterates through mutable references to child expressions.
390 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
391 let mut first = None;
392 let mut second = None;
393 let mut rest = None;
394 let mut last = None;
395
396 use PlanNode::*;
397 match self {
398 Constant { .. } | Get { .. } => (),
399 Let { value, body, .. } => {
400 first = Some(&mut **value);
401 second = Some(&mut **body);
402 }
403 LetRec { values, body, .. } => {
404 rest = Some(values);
405 last = Some(&mut **body);
406 }
407 Mfp { input, .. }
408 | FlatMap { input, .. }
409 | Reduce { input, .. }
410 | TopK { input, .. }
411 | Negate { input, .. }
412 | Threshold { input, .. }
413 | ArrangeBy { input, .. } => {
414 first = Some(&mut **input);
415 }
416 Join { inputs, .. } | Union { inputs, .. } => {
417 rest = Some(inputs);
418 }
419 }
420
421 first
422 .into_iter()
423 .chain(second)
424 .chain(rest.into_iter().flatten())
425 .chain(last)
426 }
427}
428
429impl<T> PlanNode<T> {
430 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
431 pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
432 Plan { lir_id, node: self }
433 }
434}
435
436impl Plan {
437 /// Pretty-print this [Plan] to a string.
438 pub fn pretty(&self) -> String {
439 let config = ExplainConfig::default();
440 self.debug_explain(&config, None)
441 }
442
443 /// Pretty-print this [Plan] to a string using a custom
444 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
445 /// This is intended for debugging and tests, not users.
446 pub fn debug_explain(
447 &self,
448 config: &ExplainConfig,
449 humanizer: Option<&dyn ExprHumanizer>,
450 ) -> String {
451 text_string_at(self, || PlanRenderingContext {
452 indent: Indent::default(),
453 humanizer: humanizer.unwrap_or(&DummyHumanizer),
454 annotations: BTreeMap::default(),
455 config,
456 ambiguous_ids: BTreeSet::default(),
457 })
458 }
459}
460
461/// How a `Get` stage will be rendered.
462#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
463pub enum GetPlan {
464 /// Simply pass input arrangements on to the next stage.
465 PassArrangements,
466 /// Using the supplied key, optionally seek the row, and apply the MFP.
467 Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
468 /// Scan the input collection (unarranged) and apply the MFP.
469 Collection(MapFilterProject),
470}
471
472impl<T: timely::progress::Timestamp> Plan<T> {
473 /// Convert the dataflow description into one that uses render plans.
474 #[mz_ore::instrument(
475 target = "optimizer",
476 level = "debug",
477 fields(path.segment = "finalize_dataflow")
478 )]
479 pub fn finalize_dataflow(
480 desc: DataflowDescription<OptimizedMirRelationExpr>,
481 features: &OptimizerFeatures,
482 ) -> Result<DataflowDescription<Self>, String> {
483 // First, we lower the dataflow description from MIR to LIR.
484 let mut dataflow = Self::lower_dataflow(desc, features)?;
485
486 // Subsequently, we perform plan refinements for the dataflow.
487 Self::refine_source_mfps(&mut dataflow);
488
489 if features.enable_consolidate_after_union_negate {
490 Self::refine_union_negate_consolidation(&mut dataflow);
491 }
492
493 if dataflow.is_single_time() {
494 Self::refine_single_time_operator_selection(&mut dataflow);
495
496 // The relaxation of the `must_consolidate` flag performs an LIR-based
497 // analysis and transform under checked recursion. By a similar argument
498 // made in `from_mir`, we do not expect the recursion limit to be hit.
499 // However, if that happens, we propagate an error to the caller.
500 // To apply the transform, we first obtain monotonic source and index
501 // global IDs and add them to a `TransformConfig` instance.
502 let monotonic_ids = dataflow
503 .source_imports
504 .iter()
505 .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
506 .chain(
507 dataflow
508 .index_imports
509 .iter()
510 .filter_map(|(_id, index_import)| {
511 if index_import.monotonic {
512 Some(index_import.desc.on_id)
513 } else {
514 None
515 }
516 }),
517 )
518 .collect::<BTreeSet<_>>();
519
520 let config = TransformConfig { monotonic_ids };
521 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
522 }
523
524 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
525
526 mz_repr::explain::trace_plan(&dataflow);
527
528 Ok(dataflow)
529 }
530
531 /// Lowers the dataflow description from MIR to LIR. To this end, the
532 /// method collects all available arrangements and based on this information
533 /// creates plans for every object to be built for the dataflow.
534 #[mz_ore::instrument(
535 target = "optimizer",
536 level = "debug",
537 fields(path.segment ="mir_to_lir")
538 )]
539 fn lower_dataflow(
540 desc: DataflowDescription<OptimizedMirRelationExpr>,
541 features: &OptimizerFeatures,
542 ) -> Result<DataflowDescription<Self>, String> {
543 let context = lowering::Context::new(desc.debug_name.clone(), features);
544 let dataflow = context.lower(desc)?;
545
546 mz_repr::explain::trace_plan(&dataflow);
547
548 Ok(dataflow)
549 }
550
551 /// Refines the source instance descriptions for sources imported by `dataflow` to
552 /// push down common MFP expressions.
553 #[mz_ore::instrument(
554 target = "optimizer",
555 level = "debug",
556 fields(path.segment = "refine_source_mfps")
557 )]
558 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
559 // Extract MFPs from Get operators for sources, and extract what we can for the source.
560 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
561 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
562 let source = &mut source_import.desc;
563 let mut identity_present = false;
564 let mut mfps = Vec::new();
565 for build_desc in dataflow.objects_to_build.iter_mut() {
566 let mut todo = vec![&mut build_desc.plan];
567 while let Some(expression) = todo.pop() {
568 let node = &mut expression.node;
569 if let PlanNode::Get { id, plan, .. } = node {
570 if *id == mz_expr::Id::Global(*source_id) {
571 match plan {
572 GetPlan::Collection(mfp) => mfps.push(mfp),
573 GetPlan::PassArrangements => {
574 identity_present = true;
575 }
576 GetPlan::Arrangement(..) => {
577 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
578 }
579 }
580 }
581 } else {
582 todo.extend(node.children_mut());
583 }
584 }
585 }
586
587 // Direct exports of sources are possible, and prevent pushdown.
588 identity_present |= dataflow
589 .index_exports
590 .values()
591 .any(|(x, _)| x.on_id == *source_id);
592 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
593
594 if !identity_present && !mfps.is_empty() {
595 // Extract a common prefix `MapFilterProject` from `mfps`.
596 let common = MapFilterProject::extract_common(&mut mfps[..]);
597 // Apply common expressions to the source's `MapFilterProject`.
598 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
599 MapFilterProject::compose(mfp, common)
600 } else {
601 common
602 };
603 mfp.optimize();
604 source.arguments.operators = Some(mfp);
605 }
606 }
607 mz_repr::explain::trace_plan(dataflow);
608 }
609
610 /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
611 #[mz_ore::instrument(
612 target = "optimizer",
613 level = "debug",
614 fields(path.segment = "refine_union_negate_consolidation")
615 )]
616 fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
617 for build_desc in dataflow.objects_to_build.iter_mut() {
618 let mut todo = vec![&mut build_desc.plan];
619 while let Some(expression) = todo.pop() {
620 let node = &mut expression.node;
621 match node {
622 PlanNode::Union {
623 inputs,
624 consolidate_output,
625 ..
626 } => {
627 if inputs
628 .iter()
629 .any(|input| matches!(input.node, PlanNode::Negate { .. }))
630 {
631 *consolidate_output = true;
632 }
633 }
634 _ => {}
635 }
636 todo.extend(node.children_mut());
637 }
638 }
639 mz_repr::explain::trace_plan(dataflow);
640 }
641
642 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
643 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
644 /// one-shot SELECT query.
645 #[mz_ore::instrument(
646 target = "optimizer",
647 level = "debug",
648 fields(path.segment = "refine_single_time_operator_selection")
649 )]
650 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
651 // We should only reach here if we have a one-shot SELECT query, i.e.,
652 // a single-time dataflow.
653 assert!(dataflow.is_single_time());
654
655 // Upgrade single-time plans to monotonic.
656 for build_desc in dataflow.objects_to_build.iter_mut() {
657 let mut todo = vec![&mut build_desc.plan];
658 while let Some(expression) = todo.pop() {
659 let node = &mut expression.node;
660 match node {
661 PlanNode::Reduce { plan, .. } => {
662 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
663 match plan {
664 ReducePlan::Hierarchical(hierarchical) => {
665 hierarchical.as_monotonic(true);
666 }
667 _ => {
668 // Nothing to do for other plans, and doing nothing is safe for future variants.
669 }
670 }
671 todo.extend(node.children_mut());
672 }
673 PlanNode::TopK { top_k_plan, .. } => {
674 top_k_plan.as_monotonic(true);
675 todo.extend(node.children_mut());
676 }
677 PlanNode::LetRec { body, .. } => {
678 // Only the non-recursive `body` is restricted to a single time.
679 todo.push(body);
680 }
681 _ => {
682 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
683 todo.extend(node.children_mut());
684 }
685 }
686 }
687 }
688 mz_repr::explain::trace_plan(dataflow);
689 }
690
691 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
692 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
693 /// whenever the input is deemed to be physically monotonic.
694 #[mz_ore::instrument(
695 target = "optimizer",
696 level = "debug",
697 fields(path.segment = "refine_single_time_consolidation")
698 )]
699 fn refine_single_time_consolidation(
700 dataflow: &mut DataflowDescription<Self>,
701 config: &TransformConfig,
702 ) -> Result<(), String> {
703 // We should only reach here if we have a one-shot SELECT query, i.e.,
704 // a single-time dataflow.
705 assert!(dataflow.is_single_time());
706
707 let transform = transform::RelaxMustConsolidate::<T>::new();
708 for build_desc in dataflow.objects_to_build.iter_mut() {
709 transform
710 .transform(config, &mut build_desc.plan)
711 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
712 }
713 mz_repr::explain::trace_plan(dataflow);
714 Ok(())
715 }
716}
717
718impl<T> CollectionPlan for PlanNode<T> {
719 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
720 match self {
721 PlanNode::Constant { rows: _ } => (),
722 PlanNode::Get {
723 id,
724 keys: _,
725 plan: _,
726 } => match id {
727 Id::Global(id) => {
728 out.insert(*id);
729 }
730 Id::Local(_) => (),
731 },
732 PlanNode::Let { id: _, value, body } => {
733 value.depends_on_into(out);
734 body.depends_on_into(out);
735 }
736 PlanNode::LetRec {
737 ids: _,
738 values,
739 limits: _,
740 body,
741 } => {
742 for value in values.iter() {
743 value.depends_on_into(out);
744 }
745 body.depends_on_into(out);
746 }
747 PlanNode::Join { inputs, plan: _ }
748 | PlanNode::Union {
749 inputs,
750 consolidate_output: _,
751 } => {
752 for input in inputs {
753 input.depends_on_into(out);
754 }
755 }
756 PlanNode::Mfp {
757 input,
758 mfp: _,
759 input_key_val: _,
760 }
761 | PlanNode::FlatMap {
762 input_key: _,
763 input,
764 exprs: _,
765 func: _,
766 mfp_after: _,
767 }
768 | PlanNode::ArrangeBy {
769 input_key: _,
770 input,
771 input_mfp: _,
772 forms: _,
773 }
774 | PlanNode::Reduce {
775 input_key: _,
776 input,
777 key_val_plan: _,
778 plan: _,
779 mfp_after: _,
780 }
781 | PlanNode::TopK {
782 input,
783 top_k_plan: _,
784 }
785 | PlanNode::Negate { input }
786 | PlanNode::Threshold {
787 input,
788 threshold_plan: _,
789 } => {
790 input.depends_on_into(out);
791 }
792 }
793 }
794}
795
796impl<T> CollectionPlan for Plan<T> {
797 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
798 self.node.depends_on_into(out);
799 }
800}
801
802/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
803/// on the expected number of rows that will have the same group key.
804fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
805 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
806 // mz_introspection.mz_expected_group_size_advice.
807 let mut buckets = vec![];
808 let mut current = 16;
809
810 // Plan for 4B records in the expected case if the user didn't specify a group size.
811 let limit = expected_group_size.unwrap_or(4_000_000_000);
812
813 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
814 // each layer gets from the preceding layer, while also limiting the number of layers.
815 while current < limit {
816 buckets.push(current);
817 current = current.saturating_mul(16);
818 }
819
820 buckets.reverse();
821 buckets
822}