mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
58pub struct AvailableCollections {
59 /// Whether the collection exists in unarranged form.
60 pub raw: bool,
61 /// The list of available arrangements, presented as a `KeyValRowMapping`,
62 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
63 /// The documentation for `KeyValRowMapping` explains these fields better.
64 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
65}
66
67impl AvailableCollections {
68 /// Represent a collection that has no arrangements.
69 pub fn new_raw() -> Self {
70 Self {
71 raw: true,
72 arranged: Vec::new(),
73 }
74 }
75
76 /// Represent a collection that is arranged in the specified ways.
77 pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
78 assert!(
79 !arranged.is_empty(),
80 "Invariant violated: at least one collection must exist"
81 );
82 Self {
83 raw: false,
84 arranged,
85 }
86 }
87
88 /// Get some arrangement, if one exists.
89 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
90 assert!(
91 self.raw || !self.arranged.is_empty(),
92 "Invariant violated: at least one collection must exist"
93 );
94 self.arranged.get(0)
95 }
96}
97
98/// An identifier for an LIR node.
99#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
100pub struct LirId(u64);
101
102impl LirId {
103 fn as_u64(&self) -> u64 {
104 self.0
105 }
106}
107
108impl From<LirId> for u64 {
109 fn from(value: LirId) -> Self {
110 value.as_u64()
111 }
112}
113
114impl std::fmt::Display for LirId {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{}", self.0)
117 }
118}
119
120/// A rendering plan with as much conditional logic as possible removed.
121#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
122pub struct Plan<T = mz_repr::Timestamp> {
123 /// A dataflow-local identifier.
124 pub lir_id: LirId,
125 /// The underlying operator.
126 pub node: PlanNode<T>,
127}
128
129/// The actual AST node of the `Plan`.
130#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
131pub enum PlanNode<T = mz_repr::Timestamp> {
132 /// A collection containing a pre-determined collection.
133 Constant {
134 /// Explicit update triples for the collection.
135 rows: Result<Vec<(Row, T, Diff)>, EvalError>,
136 },
137 /// A reference to a bound collection.
138 ///
139 /// This is commonly either an external reference to an existing source or
140 /// maintained arrangement, or an internal reference to a `Let` identifier.
141 Get {
142 /// A global or local identifier naming the collection.
143 id: Id,
144 /// Arrangements that will be available.
145 ///
146 /// The collection will also be loaded if available, which it will
147 /// not be for imported data, but which it may be for locally defined
148 /// data.
149 // TODO: Be more explicit about whether a collection is available,
150 // although one can always produce it from an arrangement, and it
151 // seems generally advantageous to do that instead (to avoid cloning
152 // rows, by using `mfp` first on borrowed data).
153 keys: AvailableCollections,
154 /// The actions to take when introducing the collection.
155 plan: GetPlan,
156 },
157 /// Binds `value` to `id`, and then results in `body` with that binding.
158 ///
159 /// This stage has the effect of sharing `value` across multiple possible
160 /// uses in `body`, and is the only mechanism we have for sharing collection
161 /// information across parts of a dataflow.
162 ///
163 /// The binding is not available outside of `body`.
164 Let {
165 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
166 id: LocalId,
167 /// The collection that should be bound to `id`.
168 value: Box<Plan<T>>,
169 /// The collection that results, which is allowed to contain `Get` stages
170 /// that reference `Id::Local(id)`.
171 body: Box<Plan<T>>,
172 },
173 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
174 ///
175 /// All bindings are available to all bindings, and to `body`.
176 /// The contents of each binding are initially empty, and then updated through a sequence
177 /// of iterations in which each binding is updated in sequence, from the most recent values
178 /// of all bindings.
179 LetRec {
180 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
181 ids: Vec<LocalId>,
182 /// The collection that should be bound to `id`.
183 values: Vec<Plan<T>>,
184 /// Maximum number of iterations. See further info on the MIR `LetRec`.
185 limits: Vec<Option<LetRecLimit>>,
186 /// The collection that results, which is allowed to contain `Get` stages
187 /// that reference `Id::Local(id)`.
188 body: Box<Plan<T>>,
189 },
190 /// Map, Filter, and Project operators.
191 ///
192 /// This stage contains work that we would ideally like to fuse to other plan
193 /// stages, but for practical reasons cannot. For example: threshold, topk,
194 /// and sometimes reduce stages are not able to absorb this operator.
195 Mfp {
196 /// The input collection.
197 input: Box<Plan<T>>,
198 /// Linear operator to apply to each record.
199 mfp: MapFilterProject,
200 /// Whether the input is from an arrangement, and if so,
201 /// whether we can seek to a specific value therein
202 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
203 },
204 /// A variable number of output records for each input record.
205 ///
206 /// This stage is a bit of a catch-all for logic that does not easily fit in
207 /// map stages. This includes table valued functions, but also functions of
208 /// multiple arguments, and functions that modify the sign of updates.
209 ///
210 /// This stage allows a `MapFilterProject` operator to be fused to its output,
211 /// and this can be very important as otherwise the output of `func` is just
212 /// appended to the input record, for as many outputs as it has. This has the
213 /// unpleasant default behavior of repeating potentially large records that
214 /// are being unpacked, producing quadratic output in those cases. Instead,
215 /// in these cases use a `mfp` member that projects away these large fields.
216 FlatMap {
217 /// The particular arrangement of the input we expect to use,
218 /// if any
219 input_key: Option<Vec<MirScalarExpr>>,
220 /// The input collection.
221 input: Box<Plan<T>>,
222 /// Expressions that for each row prepare the arguments to `func`.
223 exprs: Vec<MirScalarExpr>,
224 /// The variable-record emitting function.
225 func: TableFunc,
226 /// Linear operator to apply to each record produced by `func`.
227 mfp_after: MapFilterProject,
228 },
229 /// A multiway relational equijoin, with fused map, filter, and projection.
230 ///
231 /// This stage performs a multiway join among `inputs`, using the equality
232 /// constraints expressed in `plan`. The plan also describes the implementation
233 /// strategy we will use, and any pushed down per-record work.
234 Join {
235 /// An ordered list of inputs that will be joined.
236 inputs: Vec<Plan<T>>,
237 /// Detailed information about the implementation of the join.
238 ///
239 /// This includes information about the implementation strategy, but also
240 /// any map, filter, project work that we might follow the join with, but
241 /// potentially pushed down into the implementation of the join.
242 plan: JoinPlan,
243 },
244 /// Aggregation by key.
245 Reduce {
246 /// The particular arrangement of the input we expect to use,
247 /// if any
248 input_key: Option<Vec<MirScalarExpr>>,
249 /// The input collection.
250 input: Box<Plan<T>>,
251 /// A plan for changing input records into key, value pairs.
252 key_val_plan: KeyValPlan,
253 /// A plan for performing the reduce.
254 ///
255 /// The implementation of reduction has several different strategies based
256 /// on the properties of the reduction, and the input itself. Please check
257 /// out the documentation for this type for more detail.
258 plan: ReducePlan,
259 /// An MFP that must be applied to results. The projection part of this
260 /// MFP must preserve the key for the reduction; otherwise, the results
261 /// become undefined. Additionally, the MFP must be free from temporal
262 /// predicates so that it can be readily evaluated.
263 /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
264 mfp_after: MapFilterProject,
265 },
266 /// Key-based "Top K" operator, retaining the first K records in each group.
267 TopK {
268 /// The input collection.
269 input: Box<Plan<T>>,
270 /// A plan for performing the Top-K.
271 ///
272 /// The implementation of reduction has several different strategies based
273 /// on the properties of the reduction, and the input itself. Please check
274 /// out the documentation for this type for more detail.
275 top_k_plan: TopKPlan,
276 },
277 /// Inverts the sign of each update.
278 Negate {
279 /// The input collection.
280 input: Box<Plan<T>>,
281 },
282 /// Filters records that accumulate negatively.
283 ///
284 /// Although the operator suppresses updates, it is a stateful operator taking
285 /// resources proportional to the number of records with non-zero accumulation.
286 Threshold {
287 /// The input collection.
288 input: Box<Plan<T>>,
289 /// A plan for performing the threshold.
290 ///
291 /// The implementation of reduction has several different strategies based
292 /// on the properties of the reduction, and the input itself. Please check
293 /// out the documentation for this type for more detail.
294 threshold_plan: ThresholdPlan,
295 },
296 /// Adds the contents of the input collections.
297 ///
298 /// Importantly, this is *multiset* union, so the multiplicities of records will
299 /// add. This is in contrast to *set* union, where the multiplicities would be
300 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
301 /// implementing the "distinct" operator.
302 Union {
303 /// The input collections
304 inputs: Vec<Plan<T>>,
305 /// Whether to consolidate the output, e.g., cancel negated records.
306 consolidate_output: bool,
307 },
308 /// The `input` plan, but with additional arrangements.
309 ///
310 /// This operator does not change the logical contents of `input`, but ensures
311 /// that certain arrangements are available in the results. This operator can
312 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
313 /// or to cap a `Plan` so that indexes can be exported.
314 ArrangeBy {
315 /// The key that must be used to access the input.
316 input_key: Option<Vec<MirScalarExpr>>,
317 /// The input collection.
318 input: Box<Plan<T>>,
319 /// The MFP that must be applied to the input.
320 input_mfp: MapFilterProject,
321 /// A list of arrangement keys, and possibly a raw collection,
322 /// that will be added to those of the input. Does not include
323 /// any other existing arrangements.
324 forms: AvailableCollections,
325 },
326}
327
328impl<T> PlanNode<T> {
329 /// Iterates through references to child expressions.
330 pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
331 let mut first = None;
332 let mut second = None;
333 let mut rest = None;
334 let mut last = None;
335
336 use PlanNode::*;
337 match self {
338 Constant { .. } | Get { .. } => (),
339 Let { value, body, .. } => {
340 first = Some(&**value);
341 second = Some(&**body);
342 }
343 LetRec { values, body, .. } => {
344 rest = Some(values);
345 last = Some(&**body);
346 }
347 Mfp { input, .. }
348 | FlatMap { input, .. }
349 | Reduce { input, .. }
350 | TopK { input, .. }
351 | Negate { input, .. }
352 | Threshold { input, .. }
353 | ArrangeBy { input, .. } => {
354 first = Some(&**input);
355 }
356 Join { inputs, .. } | Union { inputs, .. } => {
357 rest = Some(inputs);
358 }
359 }
360
361 first
362 .into_iter()
363 .chain(second)
364 .chain(rest.into_iter().flatten())
365 .chain(last)
366 }
367
368 /// Iterates through mutable references to child expressions.
369 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
370 let mut first = None;
371 let mut second = None;
372 let mut rest = None;
373 let mut last = None;
374
375 use PlanNode::*;
376 match self {
377 Constant { .. } | Get { .. } => (),
378 Let { value, body, .. } => {
379 first = Some(&mut **value);
380 second = Some(&mut **body);
381 }
382 LetRec { values, body, .. } => {
383 rest = Some(values);
384 last = Some(&mut **body);
385 }
386 Mfp { input, .. }
387 | FlatMap { input, .. }
388 | Reduce { input, .. }
389 | TopK { input, .. }
390 | Negate { input, .. }
391 | Threshold { input, .. }
392 | ArrangeBy { input, .. } => {
393 first = Some(&mut **input);
394 }
395 Join { inputs, .. } | Union { inputs, .. } => {
396 rest = Some(inputs);
397 }
398 }
399
400 first
401 .into_iter()
402 .chain(second)
403 .chain(rest.into_iter().flatten())
404 .chain(last)
405 }
406}
407
408impl<T> PlanNode<T> {
409 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
410 pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
411 Plan { lir_id, node: self }
412 }
413}
414
415impl Plan {
416 /// Pretty-print this [Plan] to a string.
417 pub fn pretty(&self) -> String {
418 let config = ExplainConfig::default();
419 self.explain(&config, None)
420 }
421
422 /// Pretty-print this [Plan] to a string using a custom
423 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
424 pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
425 text_string_at(self, || PlanRenderingContext {
426 indent: Indent::default(),
427 humanizer: humanizer.unwrap_or(&DummyHumanizer),
428 annotations: BTreeMap::default(),
429 config,
430 })
431 }
432}
433
434/// How a `Get` stage will be rendered.
435#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
436pub enum GetPlan {
437 /// Simply pass input arrangements on to the next stage.
438 PassArrangements,
439 /// Using the supplied key, optionally seek the row, and apply the MFP.
440 Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
441 /// Scan the input collection (unarranged) and apply the MFP.
442 Collection(MapFilterProject),
443}
444
445impl<T: timely::progress::Timestamp> Plan<T> {
446 /// Convert the dataflow description into one that uses render plans.
447 #[mz_ore::instrument(
448 target = "optimizer",
449 level = "debug",
450 fields(path.segment = "finalize_dataflow")
451 )]
452 pub fn finalize_dataflow(
453 desc: DataflowDescription<OptimizedMirRelationExpr>,
454 features: &OptimizerFeatures,
455 ) -> Result<DataflowDescription<Self>, String> {
456 // First, we lower the dataflow description from MIR to LIR.
457 let mut dataflow = Self::lower_dataflow(desc, features)?;
458
459 // Subsequently, we perform plan refinements for the dataflow.
460 Self::refine_source_mfps(&mut dataflow);
461
462 if features.enable_consolidate_after_union_negate {
463 Self::refine_union_negate_consolidation(&mut dataflow);
464 }
465
466 if dataflow.is_single_time() {
467 Self::refine_single_time_operator_selection(&mut dataflow);
468
469 // The relaxation of the `must_consolidate` flag performs an LIR-based
470 // analysis and transform under checked recursion. By a similar argument
471 // made in `from_mir`, we do not expect the recursion limit to be hit.
472 // However, if that happens, we propagate an error to the caller.
473 // To apply the transform, we first obtain monotonic source and index
474 // global IDs and add them to a `TransformConfig` instance.
475 let monotonic_ids = dataflow
476 .source_imports
477 .iter()
478 .filter_map(
479 |(id, (_, monotonic, _upper))| if *monotonic { Some(*id) } else { None },
480 )
481 .chain(
482 dataflow
483 .index_imports
484 .iter()
485 .filter_map(|(_id, index_import)| {
486 if index_import.monotonic {
487 Some(index_import.desc.on_id)
488 } else {
489 None
490 }
491 }),
492 )
493 .collect::<BTreeSet<_>>();
494
495 let config = TransformConfig { monotonic_ids };
496 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
497 }
498
499 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
500
501 mz_repr::explain::trace_plan(&dataflow);
502
503 Ok(dataflow)
504 }
505
506 /// Lowers the dataflow description from MIR to LIR. To this end, the
507 /// method collects all available arrangements and based on this information
508 /// creates plans for every object to be built for the dataflow.
509 #[mz_ore::instrument(
510 target = "optimizer",
511 level = "debug",
512 fields(path.segment ="mir_to_lir")
513 )]
514 fn lower_dataflow(
515 desc: DataflowDescription<OptimizedMirRelationExpr>,
516 features: &OptimizerFeatures,
517 ) -> Result<DataflowDescription<Self>, String> {
518 let context = lowering::Context::new(desc.debug_name.clone(), features);
519 let dataflow = context.lower(desc)?;
520
521 mz_repr::explain::trace_plan(&dataflow);
522
523 Ok(dataflow)
524 }
525
526 /// Refines the source instance descriptions for sources imported by `dataflow` to
527 /// push down common MFP expressions.
528 #[mz_ore::instrument(
529 target = "optimizer",
530 level = "debug",
531 fields(path.segment = "refine_source_mfps")
532 )]
533 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
534 // Extract MFPs from Get operators for sources, and extract what we can for the source.
535 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
536 for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
537 let mut identity_present = false;
538 let mut mfps = Vec::new();
539 for build_desc in dataflow.objects_to_build.iter_mut() {
540 let mut todo = vec![&mut build_desc.plan];
541 while let Some(expression) = todo.pop() {
542 let node = &mut expression.node;
543 if let PlanNode::Get { id, plan, .. } = node {
544 if *id == mz_expr::Id::Global(*source_id) {
545 match plan {
546 GetPlan::Collection(mfp) => mfps.push(mfp),
547 GetPlan::PassArrangements => {
548 identity_present = true;
549 }
550 GetPlan::Arrangement(..) => {
551 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
552 }
553 }
554 }
555 } else {
556 todo.extend(node.children_mut());
557 }
558 }
559 }
560
561 // Direct exports of sources are possible, and prevent pushdown.
562 identity_present |= dataflow
563 .index_exports
564 .values()
565 .any(|(x, _)| x.on_id == *source_id);
566 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
567
568 if !identity_present && !mfps.is_empty() {
569 // Extract a common prefix `MapFilterProject` from `mfps`.
570 let common = MapFilterProject::extract_common(&mut mfps[..]);
571 // Apply common expressions to the source's `MapFilterProject`.
572 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
573 MapFilterProject::compose(mfp, common)
574 } else {
575 common
576 };
577 mfp.optimize();
578 source.arguments.operators = Some(mfp);
579 }
580 }
581 mz_repr::explain::trace_plan(dataflow);
582 }
583
584 /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
585 #[mz_ore::instrument(
586 target = "optimizer",
587 level = "debug",
588 fields(path.segment = "refine_union_negate_consolidation")
589 )]
590 fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
591 for build_desc in dataflow.objects_to_build.iter_mut() {
592 let mut todo = vec![&mut build_desc.plan];
593 while let Some(expression) = todo.pop() {
594 let node = &mut expression.node;
595 match node {
596 PlanNode::Union {
597 inputs,
598 consolidate_output,
599 ..
600 } => {
601 if inputs
602 .iter()
603 .any(|input| matches!(input.node, PlanNode::Negate { .. }))
604 {
605 *consolidate_output = true;
606 }
607 }
608 _ => {}
609 }
610 todo.extend(node.children_mut());
611 }
612 }
613 mz_repr::explain::trace_plan(dataflow);
614 }
615
616 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
617 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
618 /// one-shot SELECT query.
619 #[mz_ore::instrument(
620 target = "optimizer",
621 level = "debug",
622 fields(path.segment = "refine_single_time_operator_selection")
623 )]
624 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
625 // We should only reach here if we have a one-shot SELECT query, i.e.,
626 // a single-time dataflow.
627 assert!(dataflow.is_single_time());
628
629 // Upgrade single-time plans to monotonic.
630 for build_desc in dataflow.objects_to_build.iter_mut() {
631 let mut todo = vec![&mut build_desc.plan];
632 while let Some(expression) = todo.pop() {
633 let node = &mut expression.node;
634 match node {
635 PlanNode::Reduce { plan, .. } => {
636 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
637 match plan {
638 ReducePlan::Collation(collation) => {
639 collation.as_monotonic(true);
640 }
641 ReducePlan::Hierarchical(hierarchical) => {
642 hierarchical.as_monotonic(true);
643 }
644 _ => {
645 // Nothing to do for other plans, and doing nothing is safe for future variants.
646 }
647 }
648 todo.extend(node.children_mut());
649 }
650 PlanNode::TopK { top_k_plan, .. } => {
651 top_k_plan.as_monotonic(true);
652 todo.extend(node.children_mut());
653 }
654 PlanNode::LetRec { body, .. } => {
655 // Only the non-recursive `body` is restricted to a single time.
656 todo.push(body);
657 }
658 _ => {
659 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
660 todo.extend(node.children_mut());
661 }
662 }
663 }
664 }
665 mz_repr::explain::trace_plan(dataflow);
666 }
667
668 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
669 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
670 /// whenever the input is deemed to be physically monotonic.
671 #[mz_ore::instrument(
672 target = "optimizer",
673 level = "debug",
674 fields(path.segment = "refine_single_time_consolidation")
675 )]
676 fn refine_single_time_consolidation(
677 dataflow: &mut DataflowDescription<Self>,
678 config: &TransformConfig,
679 ) -> Result<(), String> {
680 // We should only reach here if we have a one-shot SELECT query, i.e.,
681 // a single-time dataflow.
682 assert!(dataflow.is_single_time());
683
684 let transform = transform::RelaxMustConsolidate::<T>::new();
685 for build_desc in dataflow.objects_to_build.iter_mut() {
686 transform
687 .transform(config, &mut build_desc.plan)
688 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
689 }
690 mz_repr::explain::trace_plan(dataflow);
691 Ok(())
692 }
693}
694
695impl<T> CollectionPlan for PlanNode<T> {
696 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
697 match self {
698 PlanNode::Constant { rows: _ } => (),
699 PlanNode::Get {
700 id,
701 keys: _,
702 plan: _,
703 } => match id {
704 Id::Global(id) => {
705 out.insert(*id);
706 }
707 Id::Local(_) => (),
708 },
709 PlanNode::Let { id: _, value, body } => {
710 value.depends_on_into(out);
711 body.depends_on_into(out);
712 }
713 PlanNode::LetRec {
714 ids: _,
715 values,
716 limits: _,
717 body,
718 } => {
719 for value in values.iter() {
720 value.depends_on_into(out);
721 }
722 body.depends_on_into(out);
723 }
724 PlanNode::Join { inputs, plan: _ }
725 | PlanNode::Union {
726 inputs,
727 consolidate_output: _,
728 } => {
729 for input in inputs {
730 input.depends_on_into(out);
731 }
732 }
733 PlanNode::Mfp {
734 input,
735 mfp: _,
736 input_key_val: _,
737 }
738 | PlanNode::FlatMap {
739 input_key: _,
740 input,
741 exprs: _,
742 func: _,
743 mfp_after: _,
744 }
745 | PlanNode::ArrangeBy {
746 input_key: _,
747 input,
748 input_mfp: _,
749 forms: _,
750 }
751 | PlanNode::Reduce {
752 input_key: _,
753 input,
754 key_val_plan: _,
755 plan: _,
756 mfp_after: _,
757 }
758 | PlanNode::TopK {
759 input,
760 top_k_plan: _,
761 }
762 | PlanNode::Negate { input }
763 | PlanNode::Threshold {
764 input,
765 threshold_plan: _,
766 } => {
767 input.depends_on_into(out);
768 }
769 }
770 }
771}
772
773impl<T> CollectionPlan for Plan<T> {
774 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
775 self.node.depends_on_into(out);
776 }
777}
778
779/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
780/// on the expected number of rows that will have the same group key.
781fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
782 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
783 // mz_introspection.mz_expected_group_size_advice.
784 let mut buckets = vec![];
785 let mut current = 16;
786
787 // Plan for 4B records in the expected case if the user didn't specify a group size.
788 let limit = expected_group_size.unwrap_or(4_000_000_000);
789
790 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
791 // each layer gets from the preceding layer, while also limiting the number of layers.
792 while current < limit {
793 buckets.push(current);
794 current = current.saturating_mul(16);
795 }
796
797 buckets.reverse();
798 buckets
799}