mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row, Timestamp};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58 Clone,
59 Debug,
60 Default,
61 Deserialize,
62 Eq,
63 Ord,
64 PartialEq,
65 PartialOrd,
66 Serialize
67)]
68pub struct AvailableCollections {
69 /// Whether the collection exists in unarranged form.
70 pub raw: bool,
71 /// The list of available arrangements, presented as a `KeyValRowMapping`,
72 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73 /// The documentation for `KeyValRowMapping` explains these fields better.
74 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78 /// Represent a collection that has no arrangements.
79 pub fn new_raw() -> Self {
80 Self {
81 raw: true,
82 arranged: Vec::new(),
83 }
84 }
85
86 /// Represent a collection that is arranged in the specified ways.
87 pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88 assert!(
89 !arranged.is_empty(),
90 "Invariant violated: at least one collection must exist"
91 );
92 Self {
93 raw: false,
94 arranged,
95 }
96 }
97
98 /// Get some arrangement, if one exists.
99 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100 assert!(
101 self.raw || !self.arranged.is_empty(),
102 "Invariant violated: at least one collection must exist"
103 );
104 self.arranged.get(0)
105 }
106}
107
108/// How to render the arrangements requested by an `ArrangeBy`.
109///
110/// Decided during LIR lowering and consumed by the renderer. The variant says what the
111/// renderer will do, not what it knows about the input.
112#[derive(
113 Clone,
114 Copy,
115 Debug,
116 Deserialize,
117 Eq,
118 Ord,
119 PartialEq,
120 PartialOrd,
121 Serialize
122)]
123pub enum ArrangementStrategy {
124 /// Form arrangements directly from the input collection.
125 Direct,
126 /// Insert temporal bucketing in front of the arrangement, to delay future-stamped
127 /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them.
128 /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like
129 /// `Direct`.
130 TemporalBucketing,
131}
132
133/// An identifier for an LIR node.
134#[derive(
135 Clone,
136 Copy,
137 Debug,
138 Deserialize,
139 Eq,
140 Ord,
141 PartialEq,
142 PartialOrd,
143 Serialize,
144 Columnar
145)]
146pub struct LirId(u64);
147
148impl LirId {
149 fn as_u64(&self) -> u64 {
150 self.0
151 }
152}
153
154impl From<LirId> for u64 {
155 fn from(value: LirId) -> Self {
156 value.as_u64()
157 }
158}
159
160impl std::fmt::Display for LirId {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 write!(f, "{}", self.0)
163 }
164}
165
166/// A rendering plan with as much conditional logic as possible removed.
167#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
168pub struct Plan {
169 /// A dataflow-local identifier.
170 pub lir_id: LirId,
171 /// The underlying operator.
172 pub node: PlanNode,
173}
174
175/// The actual AST node of the `Plan`.
176#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
177pub enum PlanNode {
178 /// A collection containing a pre-determined collection.
179 Constant {
180 /// Explicit update triples for the collection.
181 rows: Result<Vec<(Row, Timestamp, Diff)>, EvalError>,
182 },
183 /// A reference to a bound collection.
184 ///
185 /// This is commonly either an external reference to an existing source or
186 /// maintained arrangement, or an internal reference to a `Let` identifier.
187 Get {
188 /// A global or local identifier naming the collection.
189 id: Id,
190 /// Arrangements that will be available.
191 ///
192 /// The collection will also be loaded if available, which it will
193 /// not be for imported data, but which it may be for locally defined
194 /// data.
195 // TODO: Be more explicit about whether a collection is available,
196 // although one can always produce it from an arrangement, and it
197 // seems generally advantageous to do that instead (to avoid cloning
198 // rows, by using `mfp` first on borrowed data).
199 keys: AvailableCollections,
200 /// The actions to take when introducing the collection.
201 plan: GetPlan,
202 },
203 /// Binds `value` to `id`, and then results in `body` with that binding.
204 ///
205 /// This stage has the effect of sharing `value` across multiple possible
206 /// uses in `body`, and is the only mechanism we have for sharing collection
207 /// information across parts of a dataflow.
208 ///
209 /// The binding is not available outside of `body`.
210 Let {
211 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
212 id: LocalId,
213 /// The collection that should be bound to `id`.
214 value: Box<Plan>,
215 /// The collection that results, which is allowed to contain `Get` stages
216 /// that reference `Id::Local(id)`.
217 body: Box<Plan>,
218 },
219 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
220 ///
221 /// All bindings are available to all bindings, and to `body`.
222 /// The contents of each binding are initially empty, and then updated through a sequence
223 /// of iterations in which each binding is updated in sequence, from the most recent values
224 /// of all bindings.
225 LetRec {
226 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
227 ids: Vec<LocalId>,
228 /// The collection that should be bound to `id`.
229 values: Vec<Plan>,
230 /// Maximum number of iterations. See further info on the MIR `LetRec`.
231 limits: Vec<Option<LetRecLimit>>,
232 /// The collection that results, which is allowed to contain `Get` stages
233 /// that reference `Id::Local(id)`.
234 body: Box<Plan>,
235 },
236 /// Map, Filter, and Project operators.
237 ///
238 /// This stage contains work that we would ideally like to fuse to other plan
239 /// stages, but for practical reasons cannot. For example: threshold, topk,
240 /// and sometimes reduce stages are not able to absorb this operator.
241 Mfp {
242 /// The input collection.
243 input: Box<Plan>,
244 /// Linear operator to apply to each record.
245 mfp: MapFilterProject,
246 /// Whether the input is from an arrangement, and if so,
247 /// whether we can seek to a specific value therein
248 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
249 },
250 /// A variable number of output records for each input record.
251 ///
252 /// This stage is a bit of a catch-all for logic that does not easily fit in
253 /// map stages. This includes table valued functions, but also functions of
254 /// multiple arguments, and functions that modify the sign of updates.
255 ///
256 /// This stage allows a `MapFilterProject` operator to be fused to its output,
257 /// and this can be very important as otherwise the output of `func` is just
258 /// appended to the input record, for as many outputs as it has. This has the
259 /// unpleasant default behavior of repeating potentially large records that
260 /// are being unpacked, producing quadratic output in those cases. Instead,
261 /// in these cases use a `mfp` member that projects away these large fields.
262 FlatMap {
263 /// The particular arrangement of the input we expect to use,
264 /// if any
265 input_key: Option<Vec<MirScalarExpr>>,
266 /// The input collection.
267 input: Box<Plan>,
268 /// Expressions that for each row prepare the arguments to `func`.
269 exprs: Vec<MirScalarExpr>,
270 /// The variable-record emitting function.
271 func: TableFunc,
272 /// Linear operator to apply to each record produced by `func`.
273 mfp_after: MapFilterProject,
274 },
275 /// A multiway relational equijoin, with fused map, filter, and projection.
276 ///
277 /// This stage performs a multiway join among `inputs`, using the equality
278 /// constraints expressed in `plan`. The plan also describes the implementation
279 /// strategy we will use, and any pushed down per-record work.
280 Join {
281 /// An ordered list of inputs that will be joined.
282 inputs: Vec<Plan>,
283 /// Detailed information about the implementation of the join.
284 ///
285 /// This includes information about the implementation strategy, but also
286 /// any map, filter, project work that we might follow the join with, but
287 /// potentially pushed down into the implementation of the join.
288 plan: JoinPlan,
289 },
290 /// Aggregation by key.
291 Reduce {
292 /// The particular arrangement of the input we expect to use,
293 /// if any
294 input_key: Option<Vec<MirScalarExpr>>,
295 /// The input collection.
296 input: Box<Plan>,
297 /// A plan for changing input records into key, value pairs.
298 key_val_plan: KeyValPlan,
299 /// A plan for performing the reduce.
300 ///
301 /// The implementation of reduction has several different strategies based
302 /// on the properties of the reduction, and the input itself. Please check
303 /// out the documentation for this type for more detail.
304 plan: ReducePlan,
305 /// An MFP that must be applied to results. The projection part of this
306 /// MFP must preserve the key for the reduction; otherwise, the results
307 /// become undefined. Additionally, the MFP must be free from temporal
308 /// predicates so that it can be readily evaluated.
309 /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
310 mfp_after: MapFilterProject,
311 },
312 /// Key-based "Top K" operator, retaining the first K records in each group.
313 TopK {
314 /// The input collection.
315 input: Box<Plan>,
316 /// A plan for performing the Top-K.
317 ///
318 /// The implementation of reduction has several different strategies based
319 /// on the properties of the reduction, and the input itself. Please check
320 /// out the documentation for this type for more detail.
321 top_k_plan: TopKPlan,
322 },
323 /// Inverts the sign of each update.
324 Negate {
325 /// The input collection.
326 input: Box<Plan>,
327 },
328 /// Filters records that accumulate negatively.
329 ///
330 /// Although the operator suppresses updates, it is a stateful operator taking
331 /// resources proportional to the number of records with non-zero accumulation.
332 Threshold {
333 /// The input collection.
334 input: Box<Plan>,
335 /// A plan for performing the threshold.
336 ///
337 /// The implementation of reduction has several different strategies based
338 /// on the properties of the reduction, and the input itself. Please check
339 /// out the documentation for this type for more detail.
340 threshold_plan: ThresholdPlan,
341 },
342 /// Adds the contents of the input collections.
343 ///
344 /// Importantly, this is *multiset* union, so the multiplicities of records will
345 /// add. This is in contrast to *set* union, where the multiplicities would be
346 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
347 /// implementing the "distinct" operator.
348 Union {
349 /// The input collections
350 inputs: Vec<Plan>,
351 /// Whether to consolidate the output, e.g., cancel negated records.
352 consolidate_output: bool,
353 },
354 /// The `input` plan, but with additional arrangements.
355 ///
356 /// This operator does not change the logical contents of `input`, but ensures
357 /// that certain arrangements are available in the results. This operator can
358 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
359 /// or to cap a `Plan` so that indexes can be exported.
360 ArrangeBy {
361 /// The key that must be used to access the input.
362 input_key: Option<Vec<MirScalarExpr>>,
363 /// The input collection.
364 input: Box<Plan>,
365 /// The MFP that must be applied to the input.
366 input_mfp: MapFilterProject,
367 /// A list of arrangement keys, and possibly a raw collection,
368 /// that will be added to those of the input. Does not include
369 /// any other existing arrangements.
370 forms: AvailableCollections,
371 /// How the renderer should form the arrangements requested by `forms`.
372 strategy: ArrangementStrategy,
373 },
374}
375
376impl PlanNode {
377 /// Iterates through references to child expressions.
378 pub fn children(&self) -> impl Iterator<Item = &Plan> {
379 let mut first = None;
380 let mut second = None;
381 let mut rest = None;
382 let mut last = None;
383
384 use PlanNode::*;
385 match self {
386 Constant { .. } | Get { .. } => (),
387 Let { value, body, .. } => {
388 first = Some(&**value);
389 second = Some(&**body);
390 }
391 LetRec { values, body, .. } => {
392 rest = Some(values);
393 last = Some(&**body);
394 }
395 Mfp { input, .. }
396 | FlatMap { input, .. }
397 | Reduce { input, .. }
398 | TopK { input, .. }
399 | Negate { input, .. }
400 | Threshold { input, .. }
401 | ArrangeBy { input, .. } => {
402 first = Some(&**input);
403 }
404 Join { inputs, .. } | Union { inputs, .. } => {
405 rest = Some(inputs);
406 }
407 }
408
409 first
410 .into_iter()
411 .chain(second)
412 .chain(rest.into_iter().flatten())
413 .chain(last)
414 }
415
416 /// Iterates through mutable references to child expressions.
417 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan> {
418 let mut first = None;
419 let mut second = None;
420 let mut rest = None;
421 let mut last = None;
422
423 use PlanNode::*;
424 match self {
425 Constant { .. } | Get { .. } => (),
426 Let { value, body, .. } => {
427 first = Some(&mut **value);
428 second = Some(&mut **body);
429 }
430 LetRec { values, body, .. } => {
431 rest = Some(values);
432 last = Some(&mut **body);
433 }
434 Mfp { input, .. }
435 | FlatMap { input, .. }
436 | Reduce { input, .. }
437 | TopK { input, .. }
438 | Negate { input, .. }
439 | Threshold { input, .. }
440 | ArrangeBy { input, .. } => {
441 first = Some(&mut **input);
442 }
443 Join { inputs, .. } | Union { inputs, .. } => {
444 rest = Some(inputs);
445 }
446 }
447
448 first
449 .into_iter()
450 .chain(second)
451 .chain(rest.into_iter().flatten())
452 .chain(last)
453 }
454}
455
456impl PlanNode {
457 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
458 pub fn as_plan(self, lir_id: LirId) -> Plan {
459 Plan { lir_id, node: self }
460 }
461}
462
463impl Plan {
464 /// Pretty-print this [Plan] to a string.
465 pub fn pretty(&self) -> String {
466 let config = ExplainConfig::default();
467 self.debug_explain(&config, None)
468 }
469
470 /// Pretty-print this [Plan] to a string using a custom
471 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
472 /// This is intended for debugging and tests, not users.
473 pub fn debug_explain(
474 &self,
475 config: &ExplainConfig,
476 humanizer: Option<&dyn ExprHumanizer>,
477 ) -> String {
478 text_string_at(self, || PlanRenderingContext {
479 indent: Indent::default(),
480 humanizer: humanizer.unwrap_or(&DummyHumanizer),
481 annotations: BTreeMap::default(),
482 config,
483 ambiguous_ids: BTreeSet::default(),
484 })
485 }
486}
487
488/// How a `Get` stage will be rendered.
489#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
490pub enum GetPlan {
491 /// Simply pass input arrangements on to the next stage.
492 PassArrangements,
493 /// Using the supplied key, optionally seek the row, and apply the MFP.
494 Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
495 /// Scan the input collection (unarranged) and apply the MFP.
496 Collection(MapFilterProject),
497}
498
499impl Plan {
500 /// Convert the dataflow description into one that uses render plans.
501 #[mz_ore::instrument(
502 target = "optimizer",
503 level = "debug",
504 fields(path.segment = "finalize_dataflow")
505 )]
506 pub fn finalize_dataflow(
507 desc: DataflowDescription<OptimizedMirRelationExpr>,
508 features: &OptimizerFeatures,
509 ) -> Result<DataflowDescription<Self>, String> {
510 // First, we lower the dataflow description from MIR to LIR.
511 let mut dataflow = Self::lower_dataflow(desc, features)?;
512
513 // Subsequently, we perform plan refinements for the dataflow.
514 Self::refine_source_mfps(&mut dataflow);
515
516 if features.enable_consolidate_after_union_negate {
517 Self::refine_union_negate_consolidation(&mut dataflow);
518 }
519
520 if dataflow.is_single_time() {
521 Self::refine_single_time_operator_selection(&mut dataflow);
522
523 // The relaxation of the `must_consolidate` flag performs an LIR-based
524 // analysis and transform under checked recursion. By a similar argument
525 // made in `from_mir`, we do not expect the recursion limit to be hit.
526 // However, if that happens, we propagate an error to the caller.
527 // To apply the transform, we first obtain monotonic source and index
528 // global IDs and add them to a `TransformConfig` instance.
529 let monotonic_ids = dataflow
530 .source_imports
531 .iter()
532 .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
533 .chain(
534 dataflow
535 .index_imports
536 .iter()
537 .filter_map(|(_id, index_import)| {
538 if index_import.monotonic {
539 Some(index_import.desc.on_id)
540 } else {
541 None
542 }
543 }),
544 )
545 .collect::<BTreeSet<_>>();
546
547 let config = TransformConfig { monotonic_ids };
548 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
549 }
550
551 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
552
553 mz_repr::explain::trace_plan(&dataflow);
554
555 Ok(dataflow)
556 }
557
558 /// Lowers the dataflow description from MIR to LIR. To this end, the
559 /// method collects all available arrangements and based on this information
560 /// creates plans for every object to be built for the dataflow.
561 #[mz_ore::instrument(
562 target = "optimizer",
563 level = "debug",
564 fields(path.segment ="mir_to_lir")
565 )]
566 fn lower_dataflow(
567 desc: DataflowDescription<OptimizedMirRelationExpr>,
568 features: &OptimizerFeatures,
569 ) -> Result<DataflowDescription<Self>, String> {
570 let context = lowering::Context::new(desc.debug_name.clone(), features);
571 let dataflow = context.lower(desc)?;
572
573 mz_repr::explain::trace_plan(&dataflow);
574
575 Ok(dataflow)
576 }
577
578 /// Refines the source instance descriptions for sources imported by `dataflow` to
579 /// push down common MFP expressions.
580 #[mz_ore::instrument(
581 target = "optimizer",
582 level = "debug",
583 fields(path.segment = "refine_source_mfps")
584 )]
585 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
586 // Extract MFPs from Get operators for sources, and extract what we can for the source.
587 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
588 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
589 let source = &mut source_import.desc;
590 let mut identity_present = false;
591 let mut mfps = Vec::new();
592 for build_desc in dataflow.objects_to_build.iter_mut() {
593 let mut todo = vec![&mut build_desc.plan];
594 while let Some(expression) = todo.pop() {
595 let node = &mut expression.node;
596 if let PlanNode::Get { id, plan, .. } = node {
597 if *id == mz_expr::Id::Global(*source_id) {
598 match plan {
599 GetPlan::Collection(mfp) => mfps.push(mfp),
600 GetPlan::PassArrangements => {
601 identity_present = true;
602 }
603 GetPlan::Arrangement(..) => {
604 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
605 }
606 }
607 }
608 } else {
609 todo.extend(node.children_mut());
610 }
611 }
612 }
613
614 // Direct exports of sources are possible, and prevent pushdown.
615 identity_present |= dataflow
616 .index_exports
617 .values()
618 .any(|(x, _)| x.on_id == *source_id);
619 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
620
621 if !identity_present && !mfps.is_empty() {
622 // Extract a common prefix `MapFilterProject` from `mfps`.
623 let common = MapFilterProject::extract_common(&mut mfps[..]);
624 // Apply common expressions to the source's `MapFilterProject`.
625 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
626 MapFilterProject::compose(mfp, common)
627 } else {
628 common
629 };
630 mfp.optimize();
631 source.arguments.operators = Some(mfp);
632 }
633 }
634 mz_repr::explain::trace_plan(dataflow);
635 }
636
637 /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
638 #[mz_ore::instrument(
639 target = "optimizer",
640 level = "debug",
641 fields(path.segment = "refine_union_negate_consolidation")
642 )]
643 fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
644 for build_desc in dataflow.objects_to_build.iter_mut() {
645 let mut todo = vec![&mut build_desc.plan];
646 while let Some(expression) = todo.pop() {
647 let node = &mut expression.node;
648 match node {
649 PlanNode::Union {
650 inputs,
651 consolidate_output,
652 ..
653 } => {
654 if inputs
655 .iter()
656 .any(|input| matches!(input.node, PlanNode::Negate { .. }))
657 {
658 *consolidate_output = true;
659 }
660 }
661 _ => {}
662 }
663 todo.extend(node.children_mut());
664 }
665 }
666 mz_repr::explain::trace_plan(dataflow);
667 }
668
669 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
670 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
671 /// one-shot SELECT query.
672 #[mz_ore::instrument(
673 target = "optimizer",
674 level = "debug",
675 fields(path.segment = "refine_single_time_operator_selection")
676 )]
677 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
678 // We should only reach here if we have a one-shot SELECT query, i.e.,
679 // a single-time dataflow.
680 assert!(dataflow.is_single_time());
681
682 // Upgrade single-time plans to monotonic.
683 for build_desc in dataflow.objects_to_build.iter_mut() {
684 let mut todo = vec![&mut build_desc.plan];
685 while let Some(expression) = todo.pop() {
686 let node = &mut expression.node;
687 match node {
688 PlanNode::Reduce { plan, .. } => {
689 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
690 match plan {
691 ReducePlan::Hierarchical(hierarchical) => {
692 hierarchical.as_monotonic(true);
693 }
694 _ => {
695 // Nothing to do for other plans, and doing nothing is safe for future variants.
696 }
697 }
698 todo.extend(node.children_mut());
699 }
700 PlanNode::TopK { top_k_plan, .. } => {
701 top_k_plan.as_monotonic(true);
702 todo.extend(node.children_mut());
703 }
704 PlanNode::LetRec { body, .. } => {
705 // Only the non-recursive `body` is restricted to a single time.
706 todo.push(body);
707 }
708 _ => {
709 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
710 todo.extend(node.children_mut());
711 }
712 }
713 }
714 }
715 mz_repr::explain::trace_plan(dataflow);
716 }
717
718 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
719 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
720 /// whenever the input is deemed to be physically monotonic.
721 #[mz_ore::instrument(
722 target = "optimizer",
723 level = "debug",
724 fields(path.segment = "refine_single_time_consolidation")
725 )]
726 fn refine_single_time_consolidation(
727 dataflow: &mut DataflowDescription<Self>,
728 config: &TransformConfig,
729 ) -> Result<(), String> {
730 // We should only reach here if we have a one-shot SELECT query, i.e.,
731 // a single-time dataflow.
732 assert!(dataflow.is_single_time());
733
734 let transform = transform::RelaxMustConsolidate;
735 for build_desc in dataflow.objects_to_build.iter_mut() {
736 transform
737 .transform(config, &mut build_desc.plan)
738 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
739 }
740 mz_repr::explain::trace_plan(dataflow);
741 Ok(())
742 }
743}
744
745impl CollectionPlan for PlanNode {
746 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
747 match self {
748 PlanNode::Constant { rows: _ } => (),
749 PlanNode::Get {
750 id,
751 keys: _,
752 plan: _,
753 } => match id {
754 Id::Global(id) => {
755 out.insert(*id);
756 }
757 Id::Local(_) => (),
758 },
759 PlanNode::Let { id: _, value, body } => {
760 value.depends_on_into(out);
761 body.depends_on_into(out);
762 }
763 PlanNode::LetRec {
764 ids: _,
765 values,
766 limits: _,
767 body,
768 } => {
769 for value in values.iter() {
770 value.depends_on_into(out);
771 }
772 body.depends_on_into(out);
773 }
774 PlanNode::Join { inputs, plan: _ }
775 | PlanNode::Union {
776 inputs,
777 consolidate_output: _,
778 } => {
779 for input in inputs {
780 input.depends_on_into(out);
781 }
782 }
783 PlanNode::Mfp {
784 input,
785 mfp: _,
786 input_key_val: _,
787 }
788 | PlanNode::FlatMap {
789 input_key: _,
790 input,
791 exprs: _,
792 func: _,
793 mfp_after: _,
794 }
795 | PlanNode::ArrangeBy {
796 input_key: _,
797 input,
798 input_mfp: _,
799 forms: _,
800 strategy: _,
801 }
802 | PlanNode::Reduce {
803 input_key: _,
804 input,
805 key_val_plan: _,
806 plan: _,
807 mfp_after: _,
808 }
809 | PlanNode::TopK {
810 input,
811 top_k_plan: _,
812 }
813 | PlanNode::Negate { input }
814 | PlanNode::Threshold {
815 input,
816 threshold_plan: _,
817 } => {
818 input.depends_on_into(out);
819 }
820 }
821 }
822}
823
824impl CollectionPlan for Plan {
825 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
826 self.node.depends_on_into(out);
827 }
828}
829
830/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
831/// on the expected number of rows that will have the same group key.
832fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
833 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
834 // mz_introspection.mz_expected_group_size_advice.
835 let mut buckets = vec![];
836 let mut current = 16;
837
838 // Plan for 4B records in the expected case if the user didn't specify a group size.
839 let limit = expected_group_size.unwrap_or(4_000_000_000);
840
841 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
842 // each layer gets from the preceding layer, while also limiting the number of layers.
843 while current < limit {
844 buckets.push(current);
845 current = current.saturating_mul(16);
846 }
847
848 buckets.reverse();
849 buckets
850}