mz_compute_types/plan.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18 CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row, Timestamp};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58 Clone,
59 Debug,
60 Default,
61 Deserialize,
62 Eq,
63 Ord,
64 PartialEq,
65 PartialOrd,
66 Serialize
67)]
68pub struct AvailableCollections {
69 /// Whether the collection exists in unarranged form.
70 pub raw: bool,
71 /// The list of available arrangements, presented as a `KeyValRowMapping`,
72 /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73 /// The documentation for `KeyValRowMapping` explains these fields better.
74 pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78 /// Represent a collection that has no arrangements.
79 pub fn new_raw() -> Self {
80 Self {
81 raw: true,
82 arranged: Vec::new(),
83 }
84 }
85
86 /// Represent a collection that is arranged in the specified ways.
87 pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88 assert!(
89 !arranged.is_empty(),
90 "Invariant violated: at least one collection must exist"
91 );
92 Self {
93 raw: false,
94 arranged,
95 }
96 }
97
98 /// Get some arrangement, if one exists.
99 pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100 assert!(
101 self.raw || !self.arranged.is_empty(),
102 "Invariant violated: at least one collection must exist"
103 );
104 self.arranged.get(0)
105 }
106}
107
108/// How to render the arrangements requested by an `ArrangeBy`.
109///
110/// Decided during LIR lowering and consumed by the renderer. The variant says what the
111/// renderer will do, not what it knows about the input.
112#[derive(
113 Clone,
114 Copy,
115 Debug,
116 Deserialize,
117 Eq,
118 Ord,
119 PartialEq,
120 PartialOrd,
121 Serialize
122)]
123pub enum ArrangementStrategy {
124 /// Form arrangements directly from the input collection.
125 Direct,
126 /// Insert temporal bucketing in front of the arrangement, to delay future-stamped
127 /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them.
128 /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like
129 /// `Direct`.
130 TemporalBucketing,
131}
132
133impl std::fmt::Display for ArrangementStrategy {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 ArrangementStrategy::Direct => write!(f, "Direct"),
137 ArrangementStrategy::TemporalBucketing => write!(f, "TemporalBucketing"),
138 }
139 }
140}
141
142/// An identifier for an LIR node.
143#[derive(
144 Clone,
145 Copy,
146 Debug,
147 Deserialize,
148 Eq,
149 Ord,
150 PartialEq,
151 PartialOrd,
152 Serialize,
153 Columnar
154)]
155pub struct LirId(u64);
156
157impl LirId {
158 fn as_u64(&self) -> u64 {
159 self.0
160 }
161}
162
163impl From<LirId> for u64 {
164 fn from(value: LirId) -> Self {
165 value.as_u64()
166 }
167}
168
169impl std::fmt::Display for LirId {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 write!(f, "{}", self.0)
172 }
173}
174
175/// A rendering plan with as much conditional logic as possible removed.
176#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
177pub struct Plan {
178 /// A dataflow-local identifier.
179 pub lir_id: LirId,
180 /// The underlying operator.
181 pub node: PlanNode,
182}
183
184/// The actual AST node of the `Plan`.
185#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
186pub enum PlanNode {
187 /// A collection containing a pre-determined collection.
188 Constant {
189 /// Explicit update triples for the collection.
190 rows: Result<Vec<(Row, Timestamp, Diff)>, EvalError>,
191 },
192 /// A reference to a bound collection.
193 ///
194 /// This is commonly either an external reference to an existing source or
195 /// maintained arrangement, or an internal reference to a `Let` identifier.
196 Get {
197 /// A global or local identifier naming the collection.
198 id: Id,
199 /// Arrangements that will be available.
200 ///
201 /// The collection will also be loaded if available, which it will
202 /// not be for imported data, but which it may be for locally defined
203 /// data.
204 // TODO: Be more explicit about whether a collection is available,
205 // although one can always produce it from an arrangement, and it
206 // seems generally advantageous to do that instead (to avoid cloning
207 // rows, by using `mfp` first on borrowed data).
208 keys: AvailableCollections,
209 /// The actions to take when introducing the collection.
210 plan: GetPlan,
211 },
212 /// Binds `value` to `id`, and then results in `body` with that binding.
213 ///
214 /// This stage has the effect of sharing `value` across multiple possible
215 /// uses in `body`, and is the only mechanism we have for sharing collection
216 /// information across parts of a dataflow.
217 ///
218 /// The binding is not available outside of `body`.
219 Let {
220 /// The local identifier to be used, available to `body` as `Id::Local(id)`.
221 id: LocalId,
222 /// The collection that should be bound to `id`.
223 value: Box<Plan>,
224 /// The collection that results, which is allowed to contain `Get` stages
225 /// that reference `Id::Local(id)`.
226 body: Box<Plan>,
227 },
228 /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
229 ///
230 /// All bindings are available to all bindings, and to `body`.
231 /// The contents of each binding are initially empty, and then updated through a sequence
232 /// of iterations in which each binding is updated in sequence, from the most recent values
233 /// of all bindings.
234 LetRec {
235 /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
236 ids: Vec<LocalId>,
237 /// The collection that should be bound to `id`.
238 values: Vec<Plan>,
239 /// Maximum number of iterations. See further info on the MIR `LetRec`.
240 limits: Vec<Option<LetRecLimit>>,
241 /// The collection that results, which is allowed to contain `Get` stages
242 /// that reference `Id::Local(id)`.
243 body: Box<Plan>,
244 },
245 /// Map, Filter, and Project operators.
246 ///
247 /// This stage contains work that we would ideally like to fuse to other plan
248 /// stages, but for practical reasons cannot. For example: threshold, topk,
249 /// and sometimes reduce stages are not able to absorb this operator.
250 Mfp {
251 /// The input collection.
252 input: Box<Plan>,
253 /// Linear operator to apply to each record.
254 mfp: MapFilterProject,
255 /// Whether the input is from an arrangement, and if so,
256 /// whether we can seek to a specific value therein
257 input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
258 },
259 /// A variable number of output records for each input record.
260 ///
261 /// This stage is a bit of a catch-all for logic that does not easily fit in
262 /// map stages. This includes table valued functions, but also functions of
263 /// multiple arguments, and functions that modify the sign of updates.
264 ///
265 /// This stage allows a `MapFilterProject` operator to be fused to its output,
266 /// and this can be very important as otherwise the output of `func` is just
267 /// appended to the input record, for as many outputs as it has. This has the
268 /// unpleasant default behavior of repeating potentially large records that
269 /// are being unpacked, producing quadratic output in those cases. Instead,
270 /// in these cases use a `mfp` member that projects away these large fields.
271 FlatMap {
272 /// The particular arrangement of the input we expect to use,
273 /// if any
274 input_key: Option<Vec<MirScalarExpr>>,
275 /// The input collection.
276 input: Box<Plan>,
277 /// Expressions that for each row prepare the arguments to `func`.
278 exprs: Vec<MirScalarExpr>,
279 /// The variable-record emitting function.
280 func: TableFunc,
281 /// Linear operator to apply to each record produced by `func`.
282 mfp_after: MapFilterProject,
283 },
284 /// A multiway relational equijoin, with fused map, filter, and projection.
285 ///
286 /// This stage performs a multiway join among `inputs`, using the equality
287 /// constraints expressed in `plan`. The plan also describes the implementation
288 /// strategy we will use, and any pushed down per-record work.
289 Join {
290 /// An ordered list of inputs that will be joined.
291 inputs: Vec<Plan>,
292 /// Detailed information about the implementation of the join.
293 ///
294 /// This includes information about the implementation strategy, but also
295 /// any map, filter, project work that we might follow the join with, but
296 /// potentially pushed down into the implementation of the join.
297 plan: JoinPlan,
298 },
299 /// Aggregation by key.
300 Reduce {
301 /// The particular arrangement of the input we expect to use,
302 /// if any
303 input_key: Option<Vec<MirScalarExpr>>,
304 /// The input collection.
305 input: Box<Plan>,
306 /// A plan for changing input records into key, value pairs.
307 key_val_plan: KeyValPlan,
308 /// A plan for performing the reduce.
309 ///
310 /// The implementation of reduction has several different strategies based
311 /// on the properties of the reduction, and the input itself. Please check
312 /// out the documentation for this type for more detail.
313 plan: ReducePlan,
314 /// An MFP that must be applied to results. The projection part of this
315 /// MFP must preserve the key for the reduction; otherwise, the results
316 /// become undefined. Additionally, the MFP must be free from temporal
317 /// predicates so that it can be readily evaluated.
318 /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
319 mfp_after: MapFilterProject,
320 /// Strategy for forming the internal input arrangement built by `Reduce`
321 /// (materialized via `key_val_plan`).
322 ///
323 /// Set by the lowering from the input's `has_future_updates` flag. The
324 /// renderer applies it to the keyed `(key, val)` stream feeding the
325 /// reduce. See `render_reduce` for the rationale on why this is
326 /// plumbed through `Reduce` rather than handled at the arrangement site.
327 ///
328 /// Note: unrelated to the hash buckets used by hierarchical reductions
329 /// (e.g. `ReducePlan::Hierarchical`'s `buckets`), which are an internal
330 /// sharding scheme for `min`/`max`-style aggregations. Here "bucketing"
331 /// refers exclusively to temporal (time-domain) bucketing of
332 /// future-stamped updates.
333 temporal_bucketing_strategy: ArrangementStrategy,
334 },
335 /// Key-based "Top K" operator, retaining the first K records in each group.
336 TopK {
337 /// The input collection.
338 input: Box<Plan>,
339 /// A plan for performing the Top-K.
340 ///
341 /// The implementation of reduction has several different strategies based
342 /// on the properties of the reduction, and the input itself. Please check
343 /// out the documentation for this type for more detail.
344 top_k_plan: TopKPlan,
345 /// Strategy for bucketing the input collection ahead of the Top-K operator.
346 ///
347 /// Set by the lowering from the input's `has_future_updates` flag. The
348 /// renderer applies it to the per-row input stream at the top of
349 /// `render_topk`, covering all three `TopKPlan` arms uniformly. See
350 /// `PlanNode::Reduce::temporal_bucketing_strategy` for the underlying
351 /// convention.
352 temporal_bucketing_strategy: ArrangementStrategy,
353 },
354 /// Inverts the sign of each update.
355 Negate {
356 /// The input collection.
357 input: Box<Plan>,
358 },
359 /// Filters records that accumulate negatively.
360 ///
361 /// Although the operator suppresses updates, it is a stateful operator taking
362 /// resources proportional to the number of records with non-zero accumulation.
363 Threshold {
364 /// The input collection.
365 input: Box<Plan>,
366 /// A plan for performing the threshold.
367 ///
368 /// The implementation of reduction has several different strategies based
369 /// on the properties of the reduction, and the input itself. Please check
370 /// out the documentation for this type for more detail.
371 threshold_plan: ThresholdPlan,
372 },
373 /// Adds the contents of the input collections.
374 ///
375 /// Importantly, this is *multiset* union, so the multiplicities of records will
376 /// add. This is in contrast to *set* union, where the multiplicities would be
377 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
378 /// implementing the "distinct" operator.
379 Union {
380 /// The input collections
381 inputs: Vec<Plan>,
382 /// Whether to consolidate the output, e.g., cancel negated records.
383 consolidate_output: bool,
384 /// Per-input bucketing strategies. Lockstep with `inputs`: index `i` is the
385 /// strategy applied to `inputs[i]` before concatenation.
386 ///
387 /// Set by the lowering from each input's `has_future_updates` flag. Only
388 /// consolidating Unions (`consolidate_output: true`) carry non-`Direct`
389 /// entries, because bucketing only pays off ahead of a consolidating
390 /// downstream operator. See `PlanNode::Reduce::temporal_bucketing_strategy`
391 /// for the underlying convention.
392 temporal_bucketing_strategies: Vec<ArrangementStrategy>,
393 },
394 /// The `input` plan, but with additional arrangements.
395 ///
396 /// This operator does not change the logical contents of `input`, but ensures
397 /// that certain arrangements are available in the results. This operator can
398 /// be important for e.g. the `Join` stage which benefits from multiple arrangements
399 /// or to cap a `Plan` so that indexes can be exported.
400 ArrangeBy {
401 /// The key that must be used to access the input.
402 input_key: Option<Vec<MirScalarExpr>>,
403 /// The input collection.
404 input: Box<Plan>,
405 /// The MFP that must be applied to the input.
406 input_mfp: MapFilterProject,
407 /// A list of arrangement keys, and possibly a raw collection,
408 /// that will be added to those of the input. Does not include
409 /// any other existing arrangements.
410 forms: AvailableCollections,
411 /// How the renderer should form the arrangements requested by `forms`.
412 strategy: ArrangementStrategy,
413 },
414}
415
416impl PlanNode {
417 /// Iterates through references to child expressions.
418 pub fn children(&self) -> impl Iterator<Item = &Plan> {
419 let mut first = None;
420 let mut second = None;
421 let mut rest = None;
422 let mut last = None;
423
424 use PlanNode::*;
425 match self {
426 Constant { .. } | Get { .. } => (),
427 Let { value, body, .. } => {
428 first = Some(&**value);
429 second = Some(&**body);
430 }
431 LetRec { values, body, .. } => {
432 rest = Some(values);
433 last = Some(&**body);
434 }
435 Mfp { input, .. }
436 | FlatMap { input, .. }
437 | Reduce { input, .. }
438 | TopK { input, .. }
439 | Negate { input, .. }
440 | Threshold { input, .. }
441 | ArrangeBy { input, .. } => {
442 first = Some(&**input);
443 }
444 Join { inputs, .. } | Union { inputs, .. } => {
445 rest = Some(inputs);
446 }
447 }
448
449 first
450 .into_iter()
451 .chain(second)
452 .chain(rest.into_iter().flatten())
453 .chain(last)
454 }
455
456 /// Iterates through mutable references to child expressions.
457 pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan> {
458 let mut first = None;
459 let mut second = None;
460 let mut rest = None;
461 let mut last = None;
462
463 use PlanNode::*;
464 match self {
465 Constant { .. } | Get { .. } => (),
466 Let { value, body, .. } => {
467 first = Some(&mut **value);
468 second = Some(&mut **body);
469 }
470 LetRec { values, body, .. } => {
471 rest = Some(values);
472 last = Some(&mut **body);
473 }
474 Mfp { input, .. }
475 | FlatMap { input, .. }
476 | Reduce { input, .. }
477 | TopK { input, .. }
478 | Negate { input, .. }
479 | Threshold { input, .. }
480 | ArrangeBy { input, .. } => {
481 first = Some(&mut **input);
482 }
483 Join { inputs, .. } | Union { inputs, .. } => {
484 rest = Some(inputs);
485 }
486 }
487
488 first
489 .into_iter()
490 .chain(second)
491 .chain(rest.into_iter().flatten())
492 .chain(last)
493 }
494}
495
496impl PlanNode {
497 /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
498 pub fn as_plan(self, lir_id: LirId) -> Plan {
499 Plan { lir_id, node: self }
500 }
501}
502
503impl Plan {
504 /// Pretty-print this [Plan] to a string.
505 pub fn pretty(&self) -> String {
506 let config = ExplainConfig::default();
507 self.debug_explain(&config, None)
508 }
509
510 /// Pretty-print this [Plan] to a string using a custom
511 /// [ExplainConfig] and an optionally provided [ExprHumanizer].
512 /// This is intended for debugging and tests, not users.
513 pub fn debug_explain(
514 &self,
515 config: &ExplainConfig,
516 humanizer: Option<&dyn ExprHumanizer>,
517 ) -> String {
518 text_string_at(self, || PlanRenderingContext {
519 indent: Indent::default(),
520 humanizer: humanizer.unwrap_or(&DummyHumanizer),
521 annotations: BTreeMap::default(),
522 config,
523 ambiguous_ids: BTreeSet::default(),
524 })
525 }
526}
527
528/// How a `Get` stage will be rendered.
529#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
530pub enum GetPlan {
531 /// Simply pass input arrangements on to the next stage.
532 PassArrangements,
533 /// Using the supplied key, optionally seek the row, and apply the MFP.
534 Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
535 /// Scan the input collection (unarranged) and apply the MFP.
536 Collection(MapFilterProject),
537}
538
539impl Plan {
540 /// Convert the dataflow description into one that uses render plans.
541 #[mz_ore::instrument(
542 target = "optimizer",
543 level = "debug",
544 fields(path.segment = "finalize_dataflow")
545 )]
546 pub fn finalize_dataflow(
547 desc: DataflowDescription<OptimizedMirRelationExpr>,
548 features: &OptimizerFeatures,
549 ) -> Result<DataflowDescription<Self>, String> {
550 // First, we lower the dataflow description from MIR to LIR.
551 let mut dataflow = Self::lower_dataflow(desc, features)?;
552
553 // Subsequently, we perform plan refinements for the dataflow.
554 Self::refine_source_mfps(&mut dataflow);
555
556 // Note: `consolidate_output` for `Union` and per-input
557 // `temporal_bucketing_strategies` are decided at lowering time (see the
558 // `Union` arm of `lower_mir_expr_stack_safe`). The pre-existing
559 // `refine_union_negate_consolidation` pass — which used to flip
560 // `consolidate_output` to `true` for Unions with a `Negate` child — has
561 // been folded into the lowering, since lowering is the only point where
562 // the bucketing decision (which depends on `has_future_updates`) is
563 // available.
564
565 if dataflow.is_single_time() {
566 Self::refine_single_time_operator_selection(&mut dataflow);
567
568 // The relaxation of the `must_consolidate` flag performs an LIR-based
569 // analysis and transform under checked recursion. By a similar argument
570 // made in `from_mir`, we do not expect the recursion limit to be hit.
571 // However, if that happens, we propagate an error to the caller.
572 // To apply the transform, we first obtain monotonic source and index
573 // global IDs and add them to a `TransformConfig` instance.
574 let monotonic_ids = dataflow
575 .source_imports
576 .iter()
577 .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
578 .chain(
579 dataflow
580 .index_imports
581 .iter()
582 .filter_map(|(_id, index_import)| {
583 if index_import.monotonic {
584 Some(index_import.desc.on_id)
585 } else {
586 None
587 }
588 }),
589 )
590 .collect::<BTreeSet<_>>();
591
592 let config = TransformConfig { monotonic_ids };
593 Self::refine_single_time_consolidation(&mut dataflow, &config)?;
594 }
595
596 soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
597
598 mz_repr::explain::trace_plan(&dataflow);
599
600 Ok(dataflow)
601 }
602
603 /// Lowers the dataflow description from MIR to LIR. To this end, the
604 /// method collects all available arrangements and based on this information
605 /// creates plans for every object to be built for the dataflow.
606 #[mz_ore::instrument(
607 target = "optimizer",
608 level = "debug",
609 fields(path.segment ="mir_to_lir")
610 )]
611 fn lower_dataflow(
612 desc: DataflowDescription<OptimizedMirRelationExpr>,
613 features: &OptimizerFeatures,
614 ) -> Result<DataflowDescription<Self>, String> {
615 let context = lowering::Context::new(desc.debug_name.clone(), features);
616 let dataflow = context.lower(desc)?;
617
618 mz_repr::explain::trace_plan(&dataflow);
619
620 Ok(dataflow)
621 }
622
623 /// Refines the source instance descriptions for sources imported by `dataflow` to
624 /// push down common MFP expressions.
625 #[mz_ore::instrument(
626 target = "optimizer",
627 level = "debug",
628 fields(path.segment = "refine_source_mfps")
629 )]
630 fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
631 // Extract MFPs from Get operators for sources, and extract what we can for the source.
632 // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
633 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
634 let source = &mut source_import.desc;
635 let mut identity_present = false;
636 let mut mfps = Vec::new();
637 for build_desc in dataflow.objects_to_build.iter_mut() {
638 let mut todo = vec![&mut build_desc.plan];
639 while let Some(expression) = todo.pop() {
640 let node = &mut expression.node;
641 if let PlanNode::Get { id, plan, .. } = node {
642 if *id == mz_expr::Id::Global(*source_id) {
643 match plan {
644 GetPlan::Collection(mfp) => mfps.push(mfp),
645 GetPlan::PassArrangements => {
646 identity_present = true;
647 }
648 GetPlan::Arrangement(..) => {
649 panic!("Surprising `GetPlan` for imported source: {:?}", plan);
650 }
651 }
652 }
653 } else {
654 todo.extend(node.children_mut());
655 }
656 }
657 }
658
659 // Direct exports of sources are possible, and prevent pushdown.
660 identity_present |= dataflow
661 .index_exports
662 .values()
663 .any(|(x, _)| x.on_id == *source_id);
664 identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
665
666 if !identity_present && !mfps.is_empty() {
667 // Extract a common prefix `MapFilterProject` from `mfps`.
668 let common = MapFilterProject::extract_common(&mut mfps[..]);
669 // Apply common expressions to the source's `MapFilterProject`.
670 let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
671 MapFilterProject::compose(mfp, common)
672 } else {
673 common
674 };
675 mfp.optimize();
676 source.arguments.operators = Some(mfp);
677 }
678 }
679 mz_repr::explain::trace_plan(dataflow);
680 }
681
682 /// Refines the plans of objects to be built as part of `dataflow` to take advantage
683 /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
684 /// one-shot SELECT query.
685 #[mz_ore::instrument(
686 target = "optimizer",
687 level = "debug",
688 fields(path.segment = "refine_single_time_operator_selection")
689 )]
690 fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
691 // We should only reach here if we have a one-shot SELECT query, i.e.,
692 // a single-time dataflow.
693 assert!(dataflow.is_single_time());
694
695 // Upgrade single-time plans to monotonic.
696 for build_desc in dataflow.objects_to_build.iter_mut() {
697 let mut todo = vec![&mut build_desc.plan];
698 while let Some(expression) = todo.pop() {
699 let node = &mut expression.node;
700 match node {
701 PlanNode::Reduce { plan, .. } => {
702 // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
703 match plan {
704 ReducePlan::Hierarchical(hierarchical) => {
705 hierarchical.as_monotonic(true);
706 }
707 _ => {
708 // Nothing to do for other plans, and doing nothing is safe for future variants.
709 }
710 }
711 todo.extend(node.children_mut());
712 }
713 PlanNode::TopK { top_k_plan, .. } => {
714 top_k_plan.as_monotonic(true);
715 todo.extend(node.children_mut());
716 }
717 PlanNode::LetRec { body, .. } => {
718 // Only the non-recursive `body` is restricted to a single time.
719 todo.push(body);
720 }
721 _ => {
722 // Nothing to do for other expressions, and doing nothing is safe for future expressions.
723 todo.extend(node.children_mut());
724 }
725 }
726 }
727 }
728 mz_repr::explain::trace_plan(dataflow);
729 }
730
731 /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
732 /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
733 /// whenever the input is deemed to be physically monotonic.
734 #[mz_ore::instrument(
735 target = "optimizer",
736 level = "debug",
737 fields(path.segment = "refine_single_time_consolidation")
738 )]
739 fn refine_single_time_consolidation(
740 dataflow: &mut DataflowDescription<Self>,
741 config: &TransformConfig,
742 ) -> Result<(), String> {
743 // We should only reach here if we have a one-shot SELECT query, i.e.,
744 // a single-time dataflow.
745 assert!(dataflow.is_single_time());
746
747 let transform = transform::RelaxMustConsolidate;
748 for build_desc in dataflow.objects_to_build.iter_mut() {
749 transform
750 .transform(config, &mut build_desc.plan)
751 .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
752 }
753 mz_repr::explain::trace_plan(dataflow);
754 Ok(())
755 }
756}
757
758impl CollectionPlan for PlanNode {
759 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
760 match self {
761 PlanNode::Constant { rows: _ } => (),
762 PlanNode::Get {
763 id,
764 keys: _,
765 plan: _,
766 } => match id {
767 Id::Global(id) => {
768 out.insert(*id);
769 }
770 Id::Local(_) => (),
771 },
772 PlanNode::Let { id: _, value, body } => {
773 value.depends_on_into(out);
774 body.depends_on_into(out);
775 }
776 PlanNode::LetRec {
777 ids: _,
778 values,
779 limits: _,
780 body,
781 } => {
782 for value in values.iter() {
783 value.depends_on_into(out);
784 }
785 body.depends_on_into(out);
786 }
787 PlanNode::Join { inputs, plan: _ }
788 | PlanNode::Union {
789 inputs,
790 consolidate_output: _,
791 temporal_bucketing_strategies: _,
792 } => {
793 for input in inputs {
794 input.depends_on_into(out);
795 }
796 }
797 PlanNode::Mfp {
798 input,
799 mfp: _,
800 input_key_val: _,
801 }
802 | PlanNode::FlatMap {
803 input_key: _,
804 input,
805 exprs: _,
806 func: _,
807 mfp_after: _,
808 }
809 | PlanNode::ArrangeBy {
810 input_key: _,
811 input,
812 input_mfp: _,
813 forms: _,
814 strategy: _,
815 }
816 | PlanNode::Reduce {
817 input_key: _,
818 input,
819 key_val_plan: _,
820 plan: _,
821 mfp_after: _,
822 temporal_bucketing_strategy: _,
823 }
824 | PlanNode::TopK {
825 input,
826 top_k_plan: _,
827 temporal_bucketing_strategy: _,
828 }
829 | PlanNode::Negate { input }
830 | PlanNode::Threshold {
831 input,
832 threshold_plan: _,
833 } => {
834 input.depends_on_into(out);
835 }
836 }
837 }
838}
839
840impl CollectionPlan for Plan {
841 fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
842 self.node.depends_on_into(out);
843 }
844}
845
846/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
847/// on the expected number of rows that will have the same group key.
848fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
849 // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
850 // mz_introspection.mz_expected_group_size_advice.
851 let mut buckets = vec![];
852 let mut current = 16;
853
854 // Plan for 4B records in the expected case if the user didn't specify a group size.
855 let limit = expected_group_size.unwrap_or(4_000_000_000);
856
857 // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
858 // each layer gets from the preceding layer, while also limiting the number of layers.
859 while current < limit {
860 buckets.push(current);
861 current = current.saturating_mul(16);
862 }
863
864 buckets.reverse();
865 buckets
866}