Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37    SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50    UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{
71    self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
72};
73
74// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
75// plans the whole query to determine its shape and parameter types,
76// and then throws away that plan. If we were smarter, we'd stash that
77// plan somewhere so we don't have to recompute it when the query is
78// executed.
79
80pub fn describe_insert(
81    scx: &StatementContext,
82    InsertStatement {
83        table_name,
84        columns,
85        source,
86        returning,
87    }: InsertStatement<Aug>,
88) -> Result<StatementDesc, PlanError> {
89    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
90    let desc = if returning.expr.is_empty() {
91        None
92    } else {
93        Some(returning.desc)
94    };
95    Ok(StatementDesc::new(desc))
96}
97
98pub fn plan_insert(
99    scx: &StatementContext,
100    InsertStatement {
101        table_name,
102        columns,
103        source,
104        returning,
105    }: InsertStatement<Aug>,
106    params: &Params,
107) -> Result<Plan, PlanError> {
108    let (id, mut expr, returning) =
109        query::plan_insert_query(scx, table_name, columns, source, returning)?;
110    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
111    let returning = returning
112        .expr
113        .into_iter()
114        .map(|mut expr| {
115            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
116            expr.lower_uncorrelated(scx.catalog.system_vars())
117        })
118        .collect::<Result<Vec<_>, _>>()?;
119
120    Ok(Plan::Insert(InsertPlan {
121        id,
122        values: expr,
123        returning,
124    }))
125}
126
127pub fn describe_delete(
128    scx: &StatementContext,
129    stmt: DeleteStatement<Aug>,
130) -> Result<StatementDesc, PlanError> {
131    query::plan_delete_query(scx, stmt)?;
132    Ok(StatementDesc::new(None))
133}
134
135pub fn plan_delete(
136    scx: &StatementContext,
137    stmt: DeleteStatement<Aug>,
138    params: &Params,
139) -> Result<Plan, PlanError> {
140    let rtw_plan = query::plan_delete_query(scx, stmt)?;
141    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
142}
143
144pub fn describe_update(
145    scx: &StatementContext,
146    stmt: UpdateStatement<Aug>,
147) -> Result<StatementDesc, PlanError> {
148    query::plan_update_query(scx, stmt)?;
149    Ok(StatementDesc::new(None))
150}
151
152pub fn plan_update(
153    scx: &StatementContext,
154    stmt: UpdateStatement<Aug>,
155    params: &Params,
156) -> Result<Plan, PlanError> {
157    let rtw_plan = query::plan_update_query(scx, stmt)?;
158    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
159}
160
161pub fn plan_read_then_write(
162    scx: &StatementContext,
163    kind: MutationKind,
164    params: &Params,
165    query::ReadThenWritePlan {
166        id,
167        mut selection,
168        finishing,
169        assignments,
170    }: query::ReadThenWritePlan,
171) -> Result<Plan, PlanError> {
172    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
173    let mut assignments_outer = BTreeMap::new();
174    for (idx, mut set) in assignments {
175        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
176        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
177        assignments_outer.insert(idx, set);
178    }
179
180    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
181        id,
182        selection,
183        finishing,
184        assignments: assignments_outer,
185        kind,
186        returning: Vec::new(),
187    }))
188}
189
190pub fn describe_select(
191    scx: &StatementContext,
192    stmt: SelectStatement<Aug>,
193) -> Result<StatementDesc, PlanError> {
194    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
195        return Ok(StatementDesc::new(Some(desc)));
196    }
197
198    let query::PlannedRootQuery { desc, .. } =
199        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
200    Ok(StatementDesc::new(Some(desc)))
201}
202
203pub fn plan_select(
204    scx: &StatementContext,
205    select: SelectStatement<Aug>,
206    params: &Params,
207    copy_to: Option<CopyFormat>,
208) -> Result<Plan, PlanError> {
209    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
210        return Ok(Plan::SideEffectingFunc(f));
211    }
212
213    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
214    Ok(Plan::Select(plan))
215}
216
217fn plan_select_inner(
218    scx: &StatementContext,
219    select: SelectStatement<Aug>,
220    params: &Params,
221    copy_to: Option<CopyFormat>,
222) -> Result<(SelectPlan, RelationDesc), PlanError> {
223    let when = query::plan_as_of(scx, select.as_of.clone())?;
224    let lifetime = QueryLifetime::OneShot;
225    let query::PlannedRootQuery {
226        mut expr,
227        desc,
228        finishing,
229        scope: _,
230    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
231    expr.bind_parameters(scx, lifetime, params)?;
232
233    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
234    // Let's check this and simplify them to literals.
235    expr.try_visit_mut_pre(&mut |expr| {
236        if let HirRelationExpr::TopK { offset, .. } = expr {
237            let offset_value = offset_into_value(offset.take())?;
238            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
239        }
240        Ok::<(), PlanError>(())
241    })?;
242    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
243    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
244    // so later.)
245
246    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
247    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
248    // This involves binding parameters and evaluating each expression to a number.
249    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
250    // so this `limit` has to be a constant.)
251    let limit = match finishing.limit {
252        None => None,
253        Some(mut limit) => {
254            limit.bind_parameters(scx, lifetime, params)?;
255            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
256            let Some(limit) = limit.as_literal() else {
257                sql_bail!(
258                    "Top-level LIMIT must be a constant expression, got {}",
259                    limit
260                )
261            };
262            match limit {
263                Datum::Null => None,
264                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
265                _ => {
266                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
267                    sql_bail!("LIMIT must be a non-negative INT or NULL")
268                }
269            }
270        }
271    };
272    let offset = {
273        let mut offset = finishing.offset.clone();
274        offset.bind_parameters(scx, lifetime, params)?;
275        let offset = offset_into_value(offset.take())?;
276        offset
277            .try_into()
278            .expect("checked in offset_into_value that it is not negative")
279    };
280
281    // Unmaterializable functions are evaluated based on various information (e.g., the catalog)
282    // during sequencing, without taking AS OF into account. This would be hard to fix, so for now
283    // we just disallow AS OF when there is an unmaterializable function in a query (except mz_now).
284    if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
285        && select.as_of.is_some()
286        && expr.contains_unmaterializable_except_temporal()?
287    {
288        bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
289    }
290
291    let plan = SelectPlan {
292        source: expr,
293        when,
294        finishing: RowSetFinishing {
295            limit,
296            offset,
297            project: finishing.project,
298            order_by: finishing.order_by,
299        },
300        copy_to,
301        select: Some(Box::new(select)),
302    };
303
304    Ok((plan, desc))
305}
306
307pub fn describe_explain_plan(
308    scx: &StatementContext,
309    explain: ExplainPlanStatement<Aug>,
310) -> Result<StatementDesc, PlanError> {
311    let mut relation_desc = RelationDesc::builder();
312
313    match explain.stage() {
314        ExplainStage::RawPlan => {
315            let name = "Raw Plan";
316            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317        }
318        ExplainStage::DecorrelatedPlan => {
319            let name = "Decorrelated Plan";
320            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321        }
322        ExplainStage::LocalPlan => {
323            let name = "Locally Optimized Plan";
324            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
325        }
326        ExplainStage::GlobalPlan => {
327            let name = "Optimized Plan";
328            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329        }
330        ExplainStage::PhysicalPlan => {
331            let name = "Physical Plan";
332            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
333        }
334        ExplainStage::Trace => {
335            relation_desc = relation_desc
336                .with_column("Time", SqlScalarType::UInt64.nullable(false))
337                .with_column("Path", SqlScalarType::String.nullable(false))
338                .with_column("Plan", SqlScalarType::String.nullable(false));
339        }
340        ExplainStage::PlanInsights => {
341            let name = "Plan Insights";
342            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
343        }
344    };
345    let relation_desc = relation_desc.finish();
346
347    Ok(
348        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
349            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
350            _ => vec![],
351        }),
352    )
353}
354
355pub fn describe_explain_pushdown(
356    scx: &StatementContext,
357    statement: ExplainPushdownStatement<Aug>,
358) -> Result<StatementDesc, PlanError> {
359    let relation_desc = RelationDesc::builder()
360        .with_column("Source", SqlScalarType::String.nullable(false))
361        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
362        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
363        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
364        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
365        .finish();
366
367    Ok(
368        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
369            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
370            _ => vec![],
371        }),
372    )
373}
374
375pub fn describe_explain_analyze_object(
376    _scx: &StatementContext,
377    statement: ExplainAnalyzeObjectStatement<Aug>,
378) -> Result<StatementDesc, PlanError> {
379    if statement.as_sql {
380        let relation_desc = RelationDesc::builder()
381            .with_column("SQL", SqlScalarType::String.nullable(false))
382            .finish();
383        return Ok(StatementDesc::new(Some(relation_desc)));
384    }
385
386    match statement.properties {
387        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
388            properties,
389            skew,
390        }) => {
391            let mut relation_desc = RelationDesc::builder()
392                .with_column("operator", SqlScalarType::String.nullable(false));
393
394            if skew {
395                relation_desc =
396                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
397            }
398
399            let mut seen_properties = BTreeSet::new();
400            for property in properties {
401                // handle each property only once (belt and suspenders)
402                if !seen_properties.insert(property) {
403                    continue;
404                }
405
406                match property {
407                    ExplainAnalyzeComputationProperty::Memory if skew => {
408                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
409                        relation_desc = relation_desc
410                            .with_column("memory_ratio", numeric.clone())
411                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
412                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
413                            .with_column("total_memory", SqlScalarType::String.nullable(true))
414                            .with_column("records_ratio", numeric.clone())
415                            .with_column("worker_records", numeric.clone())
416                            .with_column("avg_records", numeric.clone())
417                            .with_column("total_records", numeric);
418                    }
419                    ExplainAnalyzeComputationProperty::Memory => {
420                        relation_desc = relation_desc
421                            .with_column("total_memory", SqlScalarType::String.nullable(true))
422                            .with_column(
423                                "total_records",
424                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
425                            );
426                    }
427                    ExplainAnalyzeComputationProperty::Cpu => {
428                        if skew {
429                            relation_desc = relation_desc
430                                .with_column(
431                                    "cpu_ratio",
432                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
433                                )
434                                .with_column(
435                                    "worker_elapsed",
436                                    SqlScalarType::Interval.nullable(true),
437                                )
438                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
439                        }
440                        relation_desc = relation_desc
441                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
442                    }
443                }
444            }
445
446            let relation_desc = relation_desc.finish();
447            Ok(StatementDesc::new(Some(relation_desc)))
448        }
449        ExplainAnalyzeProperty::Hints => {
450            let relation_desc = RelationDesc::builder()
451                .with_column("operator", SqlScalarType::String.nullable(true))
452                .with_column("levels", SqlScalarType::Int64.nullable(true))
453                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
454                .with_column("hint", SqlScalarType::Float64.nullable(true))
455                .with_column("savings", SqlScalarType::String.nullable(true))
456                .finish();
457            Ok(StatementDesc::new(Some(relation_desc)))
458        }
459    }
460}
461
462pub fn describe_explain_analyze_cluster(
463    _scx: &StatementContext,
464    statement: ExplainAnalyzeClusterStatement,
465) -> Result<StatementDesc, PlanError> {
466    if statement.as_sql {
467        let relation_desc = RelationDesc::builder()
468            .with_column("SQL", SqlScalarType::String.nullable(false))
469            .finish();
470        return Ok(StatementDesc::new(Some(relation_desc)));
471    }
472
473    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
474
475    let mut relation_desc = RelationDesc::builder()
476        .with_column("object", SqlScalarType::String.nullable(false))
477        .with_column("global_id", SqlScalarType::String.nullable(false));
478
479    if skew {
480        relation_desc =
481            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
482    }
483
484    let mut seen_properties = BTreeSet::new();
485    for property in properties {
486        // handle each property only once (belt and suspenders)
487        if !seen_properties.insert(property) {
488            continue;
489        }
490
491        match property {
492            ExplainAnalyzeComputationProperty::Memory if skew => {
493                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
494                relation_desc = relation_desc
495                    .with_column("max_operator_memory_ratio", numeric.clone())
496                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
497                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
498                    .with_column("total_memory", SqlScalarType::String.nullable(true))
499                    .with_column("max_operator_records_ratio", numeric.clone())
500                    .with_column("worker_records", numeric.clone())
501                    .with_column("avg_records", numeric.clone())
502                    .with_column("total_records", numeric);
503            }
504            ExplainAnalyzeComputationProperty::Memory => {
505                relation_desc = relation_desc
506                    .with_column("total_memory", SqlScalarType::String.nullable(true))
507                    .with_column(
508                        "total_records",
509                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
510                    );
511            }
512            ExplainAnalyzeComputationProperty::Cpu if skew => {
513                relation_desc = relation_desc
514                    .with_column(
515                        "max_operator_cpu_ratio",
516                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
517                    )
518                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
519                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
520                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
521            }
522            ExplainAnalyzeComputationProperty::Cpu => {
523                relation_desc = relation_desc
524                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
525            }
526        }
527    }
528
529    Ok(StatementDesc::new(Some(relation_desc.finish())))
530}
531
532pub fn describe_explain_timestamp(
533    scx: &StatementContext,
534    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536    let relation_desc = RelationDesc::builder()
537        .with_column("Timestamp", SqlScalarType::String.nullable(false))
538        .finish();
539
540    Ok(StatementDesc::new(Some(relation_desc))
541        .with_params(describe_select(scx, select)?.param_types))
542}
543
544pub fn describe_explain_schema(
545    _: &StatementContext,
546    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
547) -> Result<StatementDesc, PlanError> {
548    let relation_desc = RelationDesc::builder()
549        .with_column("Schema", SqlScalarType::String.nullable(false))
550        .finish();
551    Ok(StatementDesc::new(Some(relation_desc)))
552}
553
554// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
555// `bool`:
556// - When it's an override of a global feature flag, for example optimizer feature flags. In this
557//   case, we need not just false and true, but also None to say "take the value of the global
558//   flag".
559// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
560//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
561//   but otherwise we'd like to default to false.
562generate_extracted_config!(
563    ExplainPlanOption,
564    (Arity, Option<bool>, Default(None)),
565    (Cardinality, bool, Default(false)),
566    (ColumnNames, bool, Default(false)),
567    (FilterPushdown, Option<bool>, Default(None)),
568    (HumanizedExpressions, Option<bool>, Default(None)),
569    (JoinImplementations, bool, Default(false)),
570    (Keys, bool, Default(false)),
571    (LinearChains, bool, Default(false)),
572    (NoFastPath, bool, Default(false)),
573    (NonNegative, bool, Default(false)),
574    (NoNotices, bool, Default(false)),
575    (NodeIdentifiers, bool, Default(false)),
576    (Raw, bool, Default(false)),
577    (RawPlans, bool, Default(false)),
578    (RawSyntax, bool, Default(false)),
579    (Redacted, bool, Default(false)),
580    (SubtreeSize, bool, Default(false)),
581    (Timing, bool, Default(false)),
582    (Types, bool, Default(false)),
583    (Equivalences, bool, Default(false)),
584    (ReoptimizeImportedViews, Option<bool>, Default(None)),
585    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
586    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
587    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
588    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
589    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
590    (
591        EnableProjectionPushdownAfterRelationCse,
592        Option<bool>,
593        Default(None)
594    )
595);
596
597impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
598    type Error = PlanError;
599
600    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
601        // If `WITH(raw)` is specified, ensure that the config will be as
602        // representative for the original plan as possible.
603        if v.raw {
604            v.raw_plans = true;
605            v.raw_syntax = true;
606        }
607
608        // Certain config should default to be enabled in release builds running on
609        // staging or prod (where SOFT_ASSERTIONS are turned off).
610        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
611
612        Ok(ExplainConfig {
613            arity: v.arity.unwrap_or(enable_on_prod),
614            cardinality: v.cardinality,
615            column_names: v.column_names,
616            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
617            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
618            join_impls: v.join_implementations,
619            keys: v.keys,
620            linear_chains: !v.raw_plans && v.linear_chains,
621            no_fast_path: v.no_fast_path,
622            no_notices: v.no_notices,
623            node_ids: v.node_identifiers,
624            non_negative: v.non_negative,
625            raw_plans: v.raw_plans,
626            raw_syntax: v.raw_syntax,
627            verbose_syntax: false,
628            redacted: v.redacted,
629            subtree_size: v.subtree_size,
630            equivalences: v.equivalences,
631            timing: v.timing,
632            types: v.types,
633            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
634            features: OptimizerFeatureOverrides {
635                enable_guard_subquery_tablefunc: Default::default(),
636                enable_eager_delta_joins: v.enable_eager_delta_joins,
637                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
638                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
639                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
640                enable_consolidate_after_union_negate: Default::default(),
641                enable_reduce_mfp_fusion: Default::default(),
642                enable_cardinality_estimates: Default::default(),
643                persist_fast_path_limit: Default::default(),
644                reoptimize_imported_views: v.reoptimize_imported_views,
645                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
646                enable_projection_pushdown_after_relation_cse: v
647                    .enable_projection_pushdown_after_relation_cse,
648                enable_less_reduce_in_eqprop: Default::default(),
649                enable_dequadratic_eqprop_map: Default::default(),
650                enable_eq_classes_withholding_errors: Default::default(),
651                enable_fast_path_plan_insights: Default::default(),
652                enable_cast_elimination: Default::default(),
653            },
654        })
655    }
656}
657
658fn plan_explainee(
659    scx: &StatementContext,
660    explainee: Explainee<Aug>,
661    params: &Params,
662) -> Result<plan::Explainee, PlanError> {
663    use crate::plan::ExplaineeStatement;
664
665    let is_replan = matches!(
666        explainee,
667        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
668    );
669
670    let explainee = match explainee {
671        Explainee::View(name) | Explainee::ReplanView(name) => {
672            let item = scx.get_item_by_resolved_name(&name)?;
673            let item_type = item.item_type();
674            if item_type != CatalogItemType::View {
675                sql_bail!("Expected {name} to be a view, not a {item_type}");
676            }
677            match is_replan {
678                true => crate::plan::Explainee::ReplanView(item.id()),
679                false => crate::plan::Explainee::View(item.id()),
680            }
681        }
682        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
683            let item = scx.get_item_by_resolved_name(&name)?;
684            let item_type = item.item_type();
685            if item_type != CatalogItemType::MaterializedView {
686                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
687            }
688            match is_replan {
689                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
690                false => crate::plan::Explainee::MaterializedView(item.id()),
691            }
692        }
693        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
694            let item = scx.get_item_by_resolved_name(&name)?;
695            let item_type = item.item_type();
696            if item_type != CatalogItemType::Index {
697                sql_bail!("Expected {name} to be an index, not a {item_type}");
698            }
699            match is_replan {
700                true => crate::plan::Explainee::ReplanIndex(item.id()),
701                false => crate::plan::Explainee::Index(item.id()),
702            }
703        }
704        Explainee::Select(select, broken) => {
705            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
706            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
707        }
708        Explainee::CreateView(mut stmt, broken) => {
709            if stmt.if_exists != IfExistsBehavior::Skip {
710                // If we don't force this parameter to Skip planning will
711                // fail for names that already exist in the catalog. This
712                // can happen even in `Replace` mode if the existing item
713                // has dependencies.
714                stmt.if_exists = IfExistsBehavior::Skip;
715            } else {
716                sql_bail!(
717                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
718                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
719                );
720            }
721
722            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
723                sql_bail!("expected CreateViewPlan plan");
724            };
725
726            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
727        }
728        Explainee::CreateMaterializedView(mut stmt, broken) => {
729            if stmt.if_exists != IfExistsBehavior::Skip {
730                // If we don't force this parameter to Skip planning will
731                // fail for names that already exist in the catalog. This
732                // can happen even in `Replace` mode if the existing item
733                // has dependencies.
734                stmt.if_exists = IfExistsBehavior::Skip;
735            } else {
736                sql_bail!(
737                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
738                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
739                );
740            }
741
742            let Plan::CreateMaterializedView(plan) =
743                ddl::plan_create_materialized_view(scx, *stmt)?
744            else {
745                sql_bail!("expected CreateMaterializedViewPlan plan");
746            };
747
748            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
749                broken,
750                plan,
751            })
752        }
753        Explainee::CreateIndex(mut stmt, broken) => {
754            if !stmt.if_not_exists {
755                // If we don't force this parameter to true planning will
756                // fail for index items that already exist in the catalog.
757                stmt.if_not_exists = true;
758            } else {
759                sql_bail!(
760                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
761                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
762                );
763            }
764
765            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
766                sql_bail!("expected CreateIndexPlan plan");
767            };
768
769            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
770        }
771        Explainee::Subscribe(stmt, broken) => {
772            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
773                sql_bail!("expected SubscribePlan");
774            };
775            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
776        }
777    };
778
779    Ok(explainee)
780}
781
782pub fn plan_explain_plan(
783    scx: &StatementContext,
784    explain: ExplainPlanStatement<Aug>,
785    params: &Params,
786) -> Result<Plan, PlanError> {
787    let (format, verbose_syntax) = match explain.format() {
788        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
789        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
790        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
791        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
792    };
793    let stage = explain.stage();
794
795    // Plan ExplainConfig.
796    let mut config = {
797        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
798
799        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
800            // If filtering is disabled, explain plans should not include pushdown info.
801            with_options.filter_pushdown = Some(false);
802        }
803
804        ExplainConfig::try_from(with_options)?
805    };
806    config.verbose_syntax = verbose_syntax;
807
808    let explainee = plan_explainee(scx, explain.explainee, params)?;
809
810    Ok(Plan::ExplainPlan(ExplainPlanPlan {
811        stage,
812        format,
813        config,
814        explainee,
815    }))
816}
817
818pub fn plan_explain_schema(
819    scx: &StatementContext,
820    explain_schema: ExplainSinkSchemaStatement<Aug>,
821) -> Result<Plan, PlanError> {
822    let ExplainSinkSchemaStatement {
823        schema_for,
824        // Parser limits to JSON.
825        format: _,
826        mut statement,
827    } = explain_schema;
828
829    // Force the sink's name to one that's guaranteed not to exist, by virtue of
830    // being a non-existent item in a schema under the system's control, so that
831    // `plan_create_sink` doesn't complain about the name already existing.
832    statement.name = Some(UnresolvedItemName::qualified(&[
833        ident!("mz_catalog"),
834        ident!("mz_explain_schema"),
835    ]));
836
837    crate::pure::purify_create_sink_avro_doc_on_options(
838        scx.catalog,
839        *statement.from.item_id(),
840        &mut statement.format,
841    )?;
842
843    match ddl::plan_create_sink(scx, statement)? {
844        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
845            StorageSinkConnection::Kafka(KafkaSinkConnection {
846                format:
847                    KafkaSinkFormat {
848                        key_format,
849                        value_format:
850                            KafkaSinkFormatType::Avro {
851                                schema: value_schema,
852                                ..
853                            },
854                        ..
855                    },
856                ..
857            }) => {
858                let schema = match schema_for {
859                    ExplainSinkSchemaFor::Key => key_format
860                        .and_then(|f| match f {
861                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
862                            _ => None,
863                        })
864                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
865                    ExplainSinkSchemaFor::Value => value_schema,
866                };
867
868                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
869                    sink_from: sink.from,
870                    json_schema: schema,
871                }))
872            }
873            _ => bail_unsupported!(
874                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
875            ),
876        },
877        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
878    }
879}
880
881pub fn plan_explain_pushdown(
882    scx: &StatementContext,
883    statement: ExplainPushdownStatement<Aug>,
884    params: &Params,
885) -> Result<Plan, PlanError> {
886    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
887    let explainee = plan_explainee(scx, statement.explainee, params)?;
888    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
889}
890
891pub fn plan_explain_analyze_object(
892    scx: &StatementContext,
893    statement: ExplainAnalyzeObjectStatement<Aug>,
894    params: &Params,
895) -> Result<Plan, PlanError> {
896    let explainee_name = statement
897        .explainee
898        .name()
899        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
900        .full_name_str();
901    let explainee = plan_explainee(scx, statement.explainee, params)?;
902
903    match explainee {
904        plan::Explainee::Index(_index_id) => (),
905        plan::Explainee::MaterializedView(_item_id) => (),
906        _ => {
907            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
908        }
909    };
910
911    // generate SQL query
912
913    /* WITH {CTEs}
914       SELECT REPEAT(' ', nesting * 2) || operator AS operator
915             {columns}
916        FROM      mz_introspection.mz_lir_mapping mlm
917             JOIN {from} USING (lir_id)
918             JOIN mz_introspection.mz_mappable_objects mo
919               ON (mlm.global_id = mo.global_id)
920       WHERE     mo.name = '{plan.explainee_name}'
921             AND {predicates}
922       ORDER BY lir_id DESC
923    */
924    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
925    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
926    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
927    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
928    let mut order_by = vec!["mlm.lir_id DESC"];
929
930    match statement.properties {
931        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
932            properties,
933            skew,
934        }) => {
935            let mut worker_id = None;
936            let mut seen_properties = BTreeSet::new();
937            for property in properties {
938                // handle each property only once (belt and suspenders)
939                if !seen_properties.insert(property) {
940                    continue;
941                }
942
943                match property {
944                    ExplainAnalyzeComputationProperty::Memory => {
945                        ctes.push((
946                            "summary_memory",
947                            r#"
948  SELECT mlm.global_id AS global_id,
949         mlm.lir_id AS lir_id,
950         SUM(mas.size) AS total_memory,
951         SUM(mas.records) AS total_records,
952         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
953         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
954    FROM            mz_introspection.mz_lir_mapping mlm
955         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957                 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id"#,
959                        ));
960                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
961
962                        if skew {
963                            ctes.push((
964                                "per_worker_memory",
965                                r#"
966  SELECT mlm.global_id AS global_id,
967         mlm.lir_id AS lir_id,
968         mas.worker_id AS worker_id,
969         SUM(mas.size) AS worker_memory,
970         SUM(mas.records) AS worker_records
971    FROM            mz_introspection.mz_lir_mapping mlm
972         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
973               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
974                 ON (mas.operator_id = valid_id)
975GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
976                            ));
977                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
978
979                            if let Some(worker_id) = worker_id {
980                                predicates.push(format!("pwm.worker_id = {worker_id}"));
981                            } else {
982                                worker_id = Some("pwm.worker_id");
983                                columns.push("pwm.worker_id AS worker_id");
984                                order_by.push("worker_id");
985                            }
986
987                            columns.extend([
988                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
989                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
990                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
991                                "pg_size_pretty(sm.total_memory) AS total_memory",
992                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
993                                "pwm.worker_records AS worker_records",
994                                "sm.avg_records AS avg_records",
995                                "sm.total_records AS total_records",
996                            ]);
997                        } else {
998                            columns.extend([
999                                "pg_size_pretty(sm.total_memory) AS total_memory",
1000                                "sm.total_records AS total_records",
1001                            ]);
1002                        }
1003                    }
1004                    ExplainAnalyzeComputationProperty::Cpu => {
1005                        ctes.push((
1006                            "summary_cpu",
1007                            r#"
1008  SELECT mlm.global_id AS global_id,
1009         mlm.lir_id AS lir_id,
1010         SUM(mse.elapsed_ns) AS total_ns,
1011         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1012    FROM            mz_introspection.mz_lir_mapping mlm
1013         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1014               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1015                 ON (mse.id = valid_id)
1016GROUP BY mlm.global_id, mlm.lir_id"#,
1017                        ));
1018                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1019
1020                        if skew {
1021                            ctes.push((
1022                                "per_worker_cpu",
1023                                r#"
1024  SELECT mlm.global_id AS global_id,
1025         mlm.lir_id AS lir_id,
1026         mse.worker_id AS worker_id,
1027         SUM(mse.elapsed_ns) AS worker_ns
1028    FROM            mz_introspection.mz_lir_mapping mlm
1029         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1030               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1031                 ON (mse.id = valid_id)
1032GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1033                            ));
1034                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1035
1036                            if let Some(worker_id) = worker_id {
1037                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1038                            } else {
1039                                worker_id = Some("pwc.worker_id");
1040                                columns.push("pwc.worker_id AS worker_id");
1041                                order_by.push("worker_id");
1042                            }
1043
1044                            columns.extend([
1045                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1046                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1047                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1048                            ]);
1049                        }
1050                        columns.push(
1051                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1052                        );
1053                    }
1054                }
1055            }
1056        }
1057        ExplainAnalyzeProperty::Hints => {
1058            columns.extend([
1059                "megsa.levels AS levels",
1060                "megsa.to_cut AS to_cut",
1061                "megsa.hint AS hint",
1062                "pg_size_pretty(megsa.savings) AS savings",
1063            ]);
1064            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1065            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1066             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1067        }
1068    }
1069
1070    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1071
1072    let ctes = if !ctes.is_empty() {
1073        format!(
1074            "WITH {}",
1075            separated(
1076                ",\n",
1077                ctes.iter()
1078                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1079            )
1080        )
1081    } else {
1082        String::new()
1083    };
1084    let columns = separated(", ", columns);
1085    let from = separated(" ", from);
1086    let predicates = separated(" AND ", predicates);
1087    let order_by = separated(", ", order_by);
1088    let query = format!(
1089        r#"{ctes}
1090SELECT {columns}
1091FROM {from}
1092WHERE {predicates}
1093ORDER BY {order_by}"#
1094    );
1095
1096    if statement.as_sql {
1097        let rows = vec![Row::pack_slice(&[Datum::String(
1098            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1099                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1100            })?,
1101        )])];
1102        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1103
1104        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1105    } else {
1106        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1107        show_select.plan()
1108    }
1109}
1110
1111pub fn plan_explain_analyze_cluster(
1112    scx: &StatementContext,
1113    statement: ExplainAnalyzeClusterStatement,
1114    _params: &Params,
1115) -> Result<Plan, PlanError> {
1116    // object string
1117    // worker_id uint64        (if           skew)
1118    // memory_ratio numeric    (if memory && skew)
1119    // worker_memory string    (if memory && skew)
1120    // avg_memory string       (if memory && skew)
1121    // total_memory string     (if memory)
1122    // records_ratio numeric   (if memory && skew)
1123    // worker_records          (if memory && skew)
1124    // avg_records numeric     (if memory && skew)
1125    // total_records numeric   (if memory)
1126    // cpu_ratio numeric       (if cpu    && skew)
1127    // worker_elapsed interval (if cpu    && skew)
1128    // avg_elapsed interval    (if cpu    && skew)
1129    // total_elapsed interval  (if cpu)
1130
1131    /* WITH {CTEs}
1132       SELECT mo.name AS object
1133             {columns}
1134        FROM mz_introspection.mz_mappable_objects mo
1135             {from}
1136       WHERE {predicates}
1137       ORDER BY {order_by}, mo.name DESC
1138    */
1139    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1140    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1141    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1142    let mut predicates = vec![];
1143    let mut order_by = vec![];
1144
1145    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1146    let mut worker_id = None;
1147    let mut seen_properties = BTreeSet::new();
1148    for property in properties {
1149        // handle each property only once (belt and suspenders)
1150        if !seen_properties.insert(property) {
1151            continue;
1152        }
1153
1154        match property {
1155            ExplainAnalyzeComputationProperty::Memory => {
1156                if skew {
1157                    let mut set_worker_id = false;
1158                    if let Some(worker_id) = worker_id {
1159                        // join condition if we're showing skew for more than one property
1160                        predicates.push(format!("om.worker_id = {worker_id}"));
1161                    } else {
1162                        worker_id = Some("om.worker_id");
1163                        columns.push("om.worker_id AS worker_id");
1164                        set_worker_id = true; // we'll add ourselves to `order_by` later
1165                    };
1166
1167                    // computes the average memory per LIR operator (for per operator ratios)
1168                    ctes.push((
1169                    "per_operator_memory_summary",
1170                    r#"
1171SELECT mlm.global_id AS global_id,
1172       mlm.lir_id AS lir_id,
1173       SUM(mas.size) AS total_memory,
1174       SUM(mas.records) AS total_records,
1175       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1176       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1177FROM        mz_introspection.mz_lir_mapping mlm
1178 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1179       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1180         ON (mas.operator_id = valid_id)
1181GROUP BY mlm.global_id, mlm.lir_id"#,
1182                ));
1183
1184                    // computes the memory per worker in a per operator way
1185                    ctes.push((
1186                    "per_operator_memory_per_worker",
1187                    r#"
1188SELECT mlm.global_id AS global_id,
1189       mlm.lir_id AS lir_id,
1190       mas.worker_id AS worker_id,
1191       SUM(mas.size) AS worker_memory,
1192       SUM(mas.records) AS worker_records
1193FROM        mz_introspection.mz_lir_mapping mlm
1194 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1195       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1196         ON (mas.operator_id = valid_id)
1197GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1198                    ));
1199
1200                    // computes memory ratios per worker per operator
1201                    ctes.push((
1202                    "per_operator_memory_ratios",
1203                    r#"
1204SELECT pompw.global_id AS global_id,
1205       pompw.lir_id AS lir_id,
1206       pompw.worker_id AS worker_id,
1207       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1208       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1209  FROM      per_operator_memory_per_worker pompw
1210       JOIN per_operator_memory_summary poms
1211         USING (global_id, lir_id)
1212"#,
1213                    ));
1214
1215                    // summarizes each object, per worker
1216                    ctes.push((
1217                        "object_memory",
1218                        r#"
1219SELECT pompw.global_id AS global_id,
1220       pompw.worker_id AS worker_id,
1221       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1222       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1223       SUM(pompw.worker_memory) AS worker_memory,
1224       SUM(pompw.worker_records) AS worker_records
1225FROM        per_operator_memory_per_worker pompw
1226     JOIN   per_operator_memory_ratios pomr
1227     USING (global_id, worker_id, lir_id)
1228GROUP BY pompw.global_id, pompw.worker_id
1229"#,
1230                    ));
1231
1232                    // summarizes each worker
1233                    ctes.push(("object_average_memory", r#"
1234SELECT om.global_id AS global_id,
1235       SUM(om.worker_memory) AS total_memory,
1236       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1237       SUM(om.worker_records) AS total_records,
1238       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1239  FROM object_memory om
1240GROUP BY om.global_id"#));
1241
1242                    from.push("LEFT JOIN object_memory om USING (global_id)");
1243                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1244
1245                    columns.extend([
1246                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1247                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1248                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1249                        "pg_size_pretty(oam.total_memory) AS total_memory",
1250                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1251                        "om.worker_records AS worker_records",
1252                        "oam.avg_records AS avg_records",
1253                        "oam.total_records AS total_records",
1254                    ]);
1255
1256                    order_by.extend([
1257                        "max_operator_memory_ratio DESC",
1258                        "max_operator_records_ratio DESC",
1259                        "om.worker_memory DESC",
1260                        "worker_records DESC",
1261                    ]);
1262
1263                    if set_worker_id {
1264                        order_by.push("worker_id");
1265                    }
1266                } else {
1267                    // no skew, so just compute totals
1268                    ctes.push((
1269                        "per_operator_memory_totals",
1270                        r#"
1271    SELECT mlm.global_id AS global_id,
1272           mlm.lir_id AS lir_id,
1273           SUM(mas.size) AS total_memory,
1274           SUM(mas.records) AS total_records
1275    FROM        mz_introspection.mz_lir_mapping mlm
1276     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1277           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1278             ON (mas.operator_id = valid_id)
1279    GROUP BY mlm.global_id, mlm.lir_id"#,
1280                    ));
1281
1282                    ctes.push((
1283                        "object_memory_totals",
1284                        r#"
1285SELECT pomt.global_id AS global_id,
1286       SUM(pomt.total_memory) AS total_memory,
1287       SUM(pomt.total_records) AS total_records
1288FROM per_operator_memory_totals pomt
1289GROUP BY pomt.global_id
1290"#,
1291                    ));
1292
1293                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1294                    columns.extend([
1295                        "pg_size_pretty(omt.total_memory) AS total_memory",
1296                        "omt.total_records AS total_records",
1297                    ]);
1298                    order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1299                }
1300            }
1301            ExplainAnalyzeComputationProperty::Cpu => {
1302                if skew {
1303                    let mut set_worker_id = false;
1304                    if let Some(worker_id) = worker_id {
1305                        // join condition if we're showing skew for more than one property
1306                        predicates.push(format!("oc.worker_id = {worker_id}"));
1307                    } else {
1308                        worker_id = Some("oc.worker_id");
1309                        columns.push("oc.worker_id AS worker_id");
1310                        set_worker_id = true; // we'll add ourselves to `order_by` later
1311                    };
1312
1313                    // computes the average memory per LIR operator (for per operator ratios)
1314                    ctes.push((
1315    "per_operator_cpu_summary",
1316    r#"
1317SELECT mlm.global_id AS global_id,
1318       mlm.lir_id AS lir_id,
1319       SUM(mse.elapsed_ns) AS total_ns,
1320       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1321FROM       mz_introspection.mz_lir_mapping mlm
1322CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1323      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1324        ON (mse.id = valid_id)
1325GROUP BY mlm.global_id, mlm.lir_id"#,
1326));
1327
1328                    // computes the CPU per worker in a per operator way
1329                    ctes.push((
1330                        "per_operator_cpu_per_worker",
1331                        r#"
1332SELECT mlm.global_id AS global_id,
1333       mlm.lir_id AS lir_id,
1334       mse.worker_id AS worker_id,
1335       SUM(mse.elapsed_ns) AS worker_ns
1336FROM       mz_introspection.mz_lir_mapping mlm
1337CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1338      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1339        ON (mse.id = valid_id)
1340GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1341                    ));
1342
1343                    // computes CPU ratios per worker per operator
1344                    ctes.push((
1345                        "per_operator_cpu_ratios",
1346                        r#"
1347SELECT pocpw.global_id AS global_id,
1348       pocpw.lir_id AS lir_id,
1349       pocpw.worker_id AS worker_id,
1350       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1351FROM      per_operator_cpu_per_worker pocpw
1352     JOIN per_operator_cpu_summary pocs
1353     USING (global_id, lir_id)
1354"#,
1355                    ));
1356
1357                    // summarizes each object, per worker
1358                    ctes.push((
1359                        "object_cpu",
1360                        r#"
1361SELECT pocpw.global_id AS global_id,
1362       pocpw.worker_id AS worker_id,
1363       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1364       SUM(pocpw.worker_ns) AS worker_ns
1365FROM      per_operator_cpu_per_worker pocpw
1366     JOIN per_operator_cpu_ratios pomr
1367     USING (global_id, worker_id, lir_id)
1368GROUP BY pocpw.global_id, pocpw.worker_id
1369"#,
1370                    ));
1371
1372                    // summarizes each worker
1373                    ctes.push((
1374                        "object_average_cpu",
1375                        r#"
1376SELECT oc.global_id AS global_id,
1377       SUM(oc.worker_ns) AS total_ns,
1378       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1379  FROM object_cpu oc
1380GROUP BY oc.global_id"#,));
1381
1382                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1383                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1384
1385                    columns.extend([
1386                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1387                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1388                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1389                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1390                    ]);
1391
1392                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1393
1394                    if set_worker_id {
1395                        order_by.push("worker_id");
1396                    }
1397                } else {
1398                    // no skew, so just compute totals
1399                    ctes.push((
1400                        "per_operator_cpu_totals",
1401                        r#"
1402    SELECT mlm.global_id AS global_id,
1403           mlm.lir_id AS lir_id,
1404           SUM(mse.elapsed_ns) AS total_ns
1405    FROM        mz_introspection.mz_lir_mapping mlm
1406     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1407           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1408             ON (mse.id = valid_id)
1409    GROUP BY mlm.global_id, mlm.lir_id"#,
1410                    ));
1411
1412                    ctes.push((
1413                        "object_cpu_totals",
1414                        r#"
1415SELECT poct.global_id AS global_id,
1416       SUM(poct.total_ns) AS total_ns
1417FROM per_operator_cpu_totals poct
1418GROUP BY poct.global_id
1419"#,
1420                    ));
1421
1422                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1423                    columns
1424                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1425                    order_by.extend(["total_elapsed DESC"]);
1426                }
1427            }
1428        }
1429    }
1430
1431    // generate SQL query text
1432    let ctes = if !ctes.is_empty() {
1433        format!(
1434            "WITH {}",
1435            separated(
1436                ",\n",
1437                ctes.iter()
1438                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1439            )
1440        )
1441    } else {
1442        String::new()
1443    };
1444    let columns = separated(", ", columns);
1445    let from = separated(" ", from);
1446    let predicates = if !predicates.is_empty() {
1447        format!("WHERE {}", separated(" AND ", predicates))
1448    } else {
1449        String::new()
1450    };
1451    // add mo.name last, to break ties only
1452    order_by.push("mo.name DESC");
1453    let order_by = separated(", ", order_by);
1454    let query = format!(
1455        r#"{ctes}
1456SELECT {columns}
1457FROM {from}
1458{predicates}
1459ORDER BY {order_by}"#
1460    );
1461
1462    if statement.as_sql {
1463        let rows = vec![Row::pack_slice(&[Datum::String(
1464            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1465                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1466            })?,
1467        )])];
1468        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1469
1470        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1471    } else {
1472        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1473        show_select.plan()
1474    }
1475}
1476
1477pub fn plan_explain_timestamp(
1478    scx: &StatementContext,
1479    explain: ExplainTimestampStatement<Aug>,
1480) -> Result<Plan, PlanError> {
1481    let (format, _verbose_syntax) = match explain.format() {
1482        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1483        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1484        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1485        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1486    };
1487
1488    let raw_plan = {
1489        let query::PlannedRootQuery {
1490            expr: raw_plan,
1491            desc: _,
1492            finishing: _,
1493            scope: _,
1494        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1495        if raw_plan.contains_parameters()? {
1496            return Err(PlanError::ParameterNotAllowed(
1497                "EXPLAIN TIMESTAMP".to_string(),
1498            ));
1499        }
1500
1501        raw_plan
1502    };
1503    let when = query::plan_as_of(scx, explain.select.as_of)?;
1504
1505    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1506        format,
1507        raw_plan,
1508        when,
1509    }))
1510}
1511
1512/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1513/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1514#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1515pub fn plan_query(
1516    scx: &StatementContext,
1517    query: Query<Aug>,
1518    params: &Params,
1519    lifetime: QueryLifetime,
1520) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1521    let query::PlannedRootQuery {
1522        mut expr,
1523        desc,
1524        finishing,
1525        scope,
1526    } = query::plan_root_query(scx, query, lifetime)?;
1527    expr.bind_parameters(scx, lifetime, params)?;
1528
1529    Ok(query::PlannedRootQuery {
1530        // No metrics passed! One more reason not to use this deprecated function.
1531        expr: expr.lower(scx.catalog.system_vars(), None)?,
1532        desc,
1533        finishing,
1534        scope,
1535    })
1536}
1537
1538generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1539
1540pub fn describe_subscribe(
1541    scx: &StatementContext,
1542    stmt: SubscribeStatement<Aug>,
1543) -> Result<StatementDesc, PlanError> {
1544    let relation_desc = match stmt.relation {
1545        SubscribeRelation::Name(name) => {
1546            let item = scx.get_item_by_resolved_name(&name)?;
1547            match item.relation_desc() {
1548                Some(desc) => desc.into_owned(),
1549                None => sql_bail!(
1550                    "'{}' cannot be subscribed to because it is a {}",
1551                    name.full_name_str(),
1552                    item.item_type(),
1553                ),
1554            }
1555        }
1556        SubscribeRelation::Query(query) => {
1557            let query::PlannedRootQuery { desc, .. } =
1558                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1559            desc
1560        }
1561    };
1562    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1563    let progress = progress.unwrap_or(false);
1564    let mut desc = RelationDesc::builder().with_column(
1565        "mz_timestamp",
1566        SqlScalarType::Numeric {
1567            max_scale: Some(NumericMaxScale::ZERO),
1568        }
1569        .nullable(false),
1570    );
1571    if progress {
1572        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1573    }
1574
1575    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1576    match stmt.output {
1577        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1578            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1579            for (name, mut ty) in relation_desc.into_iter() {
1580                if progress {
1581                    ty.nullable = true;
1582                }
1583                desc = desc.with_column(name, ty);
1584            }
1585        }
1586        SubscribeOutput::EnvelopeUpsert { key_columns }
1587        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1588            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1589            let key_columns = key_columns
1590                .into_iter()
1591                .map(normalize::column_name)
1592                .collect_vec();
1593            let mut before_values_desc = RelationDesc::builder();
1594            let mut after_values_desc = RelationDesc::builder();
1595
1596            // Add the key columns in the order that they're specified.
1597            for column_name in &key_columns {
1598                let mut column_ty = relation_desc
1599                    .get_by_name(column_name)
1600                    .map(|(_pos, ty)| ty.clone())
1601                    .ok_or_else(|| PlanError::UnknownColumn {
1602                        table: None,
1603                        column: column_name.clone(),
1604                        similar: Box::new([]),
1605                    })?;
1606                if progress {
1607                    column_ty.nullable = true;
1608                }
1609                desc = desc.with_column(column_name, column_ty);
1610            }
1611
1612            // Then add the remaining columns in the order from the original
1613            // table, filtering out the key columns since we added those above.
1614            for (mut name, mut ty) in relation_desc
1615                .into_iter()
1616                .filter(|(name, _ty)| !key_columns.contains(name))
1617            {
1618                ty.nullable = true;
1619                before_values_desc =
1620                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1621                if debezium {
1622                    name = format!("after_{}", name).into();
1623                }
1624                after_values_desc = after_values_desc.with_column(name, ty);
1625            }
1626
1627            if debezium {
1628                desc = desc.concat(before_values_desc);
1629            }
1630            desc = desc.concat(after_values_desc);
1631        }
1632    }
1633    Ok(StatementDesc::new(Some(desc.finish())))
1634}
1635
1636pub fn plan_subscribe(
1637    scx: &StatementContext,
1638    SubscribeStatement {
1639        relation,
1640        options,
1641        as_of,
1642        up_to,
1643        output,
1644    }: SubscribeStatement<Aug>,
1645    params: &Params,
1646    copy_to: Option<CopyFormat>,
1647) -> Result<Plan, PlanError> {
1648    let (from, desc, scope) = match relation {
1649        SubscribeRelation::Name(name) => {
1650            let item = scx.get_item_by_resolved_name(&name)?;
1651            let Some(desc) = item.relation_desc() else {
1652                sql_bail!(
1653                    "'{}' cannot be subscribed to because it is a {}",
1654                    name.full_name_str(),
1655                    item.item_type(),
1656                );
1657            };
1658            let item_name = match name {
1659                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1660                _ => None,
1661            };
1662            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1663            (
1664                SubscribeFrom::Id(item.global_id()),
1665                desc.into_owned(),
1666                scope,
1667            )
1668        }
1669        SubscribeRelation::Query(query) => {
1670            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1671            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1672            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1673            // finishing should have already been turned into a `TopK` by
1674            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1675            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1676                &query.finishing,
1677                query.desc.arity()
1678            ));
1679            let desc = query.desc.clone();
1680            (
1681                SubscribeFrom::Query {
1682                    expr: query.expr,
1683                    desc: query.desc,
1684                },
1685                desc,
1686                query.scope,
1687            )
1688        }
1689    };
1690
1691    let when = query::plan_as_of(scx, as_of)?;
1692    let up_to = up_to
1693        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1694        .transpose()?;
1695
1696    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1697    let ecx = ExprContext {
1698        qcx: &qcx,
1699        name: "",
1700        scope: &scope,
1701        relation_type: desc.typ(),
1702        allow_aggregates: false,
1703        allow_subqueries: true,
1704        allow_parameters: true,
1705        allow_windows: false,
1706    };
1707
1708    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1709    let output = match output {
1710        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1711        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1712            let order_by = key_columns
1713                .iter()
1714                .map(|ident| OrderByExpr {
1715                    expr: Expr::Identifier(vec![ident.clone()]),
1716                    asc: None,
1717                    nulls_last: None,
1718                })
1719                .collect_vec();
1720            let (order_by, map_exprs) = query::plan_order_by_exprs(
1721                &ExprContext {
1722                    name: "ENVELOPE UPSERT KEY clause",
1723                    ..ecx
1724                },
1725                &order_by[..],
1726                &output_columns[..],
1727            )?;
1728            if !map_exprs.is_empty() {
1729                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1730            }
1731            plan::SubscribeOutput::EnvelopeUpsert {
1732                order_by_keys: order_by,
1733            }
1734        }
1735        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1736            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1737            let order_by = key_columns
1738                .iter()
1739                .map(|ident| OrderByExpr {
1740                    expr: Expr::Identifier(vec![ident.clone()]),
1741                    asc: None,
1742                    nulls_last: None,
1743                })
1744                .collect_vec();
1745            let (order_by, map_exprs) = query::plan_order_by_exprs(
1746                &ExprContext {
1747                    name: "ENVELOPE DEBEZIUM KEY clause",
1748                    ..ecx
1749                },
1750                &order_by[..],
1751                &output_columns[..],
1752            )?;
1753            if !map_exprs.is_empty() {
1754                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1755            }
1756            plan::SubscribeOutput::EnvelopeDebezium {
1757                order_by_keys: order_by,
1758            }
1759        }
1760        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1761            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1762            let mz_diff = "mz_diff".into();
1763            let output_columns = std::iter::once((0, &mz_diff))
1764                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1765                .collect_vec();
1766            match query::plan_order_by_exprs(
1767                &ExprContext {
1768                    name: "WITHIN TIMESTAMP ORDER BY clause",
1769                    ..ecx
1770                },
1771                &order_by[..],
1772                &output_columns[..],
1773            ) {
1774                Err(PlanError::UnknownColumn {
1775                    table: None,
1776                    column,
1777                    similar: _,
1778                }) if &column == &mz_diff => {
1779                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1780                    // it looks like an unknown column. Instead, return a better error
1781                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1782                }
1783                Err(e) => return Err(e),
1784                Ok((order_by, map_exprs)) => {
1785                    if !map_exprs.is_empty() {
1786                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1787                    }
1788
1789                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1790                }
1791            }
1792        }
1793    };
1794
1795    let SubscribeOptionExtracted {
1796        progress, snapshot, ..
1797    } = options.try_into()?;
1798    Ok(Plan::Subscribe(SubscribePlan {
1799        from,
1800        when,
1801        up_to,
1802        with_snapshot: snapshot.unwrap_or(true),
1803        copy_to,
1804        emit_progress: progress.unwrap_or(false),
1805        output,
1806    }))
1807}
1808
1809pub fn describe_copy_from_table(
1810    scx: &StatementContext,
1811    table_name: <Aug as AstInfo>::ItemName,
1812    columns: Vec<Ident>,
1813) -> Result<StatementDesc, PlanError> {
1814    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1815    Ok(StatementDesc::new(Some(desc)))
1816}
1817
1818pub fn describe_copy_item(
1819    scx: &StatementContext,
1820    object_name: <Aug as AstInfo>::ItemName,
1821    columns: Vec<Ident>,
1822) -> Result<StatementDesc, PlanError> {
1823    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1824    Ok(StatementDesc::new(Some(desc)))
1825}
1826
1827pub fn describe_copy(
1828    scx: &StatementContext,
1829    CopyStatement {
1830        relation,
1831        direction,
1832        ..
1833    }: CopyStatement<Aug>,
1834) -> Result<StatementDesc, PlanError> {
1835    Ok(match (relation, direction) {
1836        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1837            describe_copy_item(scx, name, columns)?
1838        }
1839        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1840            describe_copy_from_table(scx, name, columns)?
1841        }
1842        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1843        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1844    }
1845    .with_is_copy())
1846}
1847
1848fn plan_copy_to_expr(
1849    scx: &StatementContext,
1850    select_plan: SelectPlan,
1851    desc: RelationDesc,
1852    to: &Expr<Aug>,
1853    format: CopyFormat,
1854    options: CopyOptionExtracted,
1855) -> Result<Plan, PlanError> {
1856    let conn_id = match options.aws_connection {
1857        Some(conn_id) => CatalogItemId::from(conn_id),
1858        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1859    };
1860    let connection = scx.get_item(&conn_id).connection()?;
1861
1862    match connection {
1863        mz_storage_types::connections::Connection::Aws(_) => {}
1864        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1865    }
1866
1867    let format = match format {
1868        CopyFormat::Csv => {
1869            let quote = extract_byte_param_value(options.quote, "quote")?;
1870            let escape = extract_byte_param_value(options.escape, "escape")?;
1871            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1872            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1873                CopyCsvFormatParams::try_new(
1874                    delimiter,
1875                    quote,
1876                    escape,
1877                    options.header,
1878                    options.null,
1879                )
1880                .map_err(|e| sql_err!("{}", e))?,
1881            ))
1882        }
1883        CopyFormat::Parquet => {
1884            // Validate that the output desc can be formatted as parquet
1885            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1886            S3SinkFormat::Parquet
1887        }
1888        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1889        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1890    };
1891
1892    // Converting the to expr to a HirScalarExpr
1893    let mut to_expr = to.clone();
1894    transform_ast::transform(scx, &mut to_expr)?;
1895    let relation_type = RelationDesc::empty();
1896    let ecx = &ExprContext {
1897        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1898        name: "COPY TO target",
1899        scope: &Scope::empty(),
1900        relation_type: relation_type.typ(),
1901        allow_aggregates: false,
1902        allow_subqueries: false,
1903        allow_parameters: false,
1904        allow_windows: false,
1905    };
1906
1907    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1908
1909    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1910        sql_bail!(
1911            "MAX FILE SIZE cannot be less than {}",
1912            MIN_S3_SINK_FILE_SIZE
1913        );
1914    }
1915    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1916        sql_bail!(
1917            "MAX FILE SIZE cannot be greater than {}",
1918            MAX_S3_SINK_FILE_SIZE
1919        );
1920    }
1921
1922    Ok(Plan::CopyTo(CopyToPlan {
1923        select_plan,
1924        desc,
1925        to,
1926        connection: connection.to_owned(),
1927        connection_id: conn_id,
1928        format,
1929        max_file_size: options.max_file_size.as_bytes(),
1930    }))
1931}
1932
1933fn plan_copy_from(
1934    scx: &StatementContext,
1935    target: &CopyTarget<Aug>,
1936    table_name: ResolvedItemName,
1937    columns: Vec<Ident>,
1938    format: CopyFormat,
1939    options: CopyOptionExtracted,
1940) -> Result<Plan, PlanError> {
1941    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1942        match option {
1943            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1944            None => Ok(()),
1945        }
1946    }
1947
1948    let source = match target {
1949        CopyTarget::Stdin => CopyFromSource::Stdin,
1950        CopyTarget::Expr(from) => {
1951            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1952
1953            // Converting the expr to an HirScalarExpr
1954            let mut from_expr = from.clone();
1955            transform_ast::transform(scx, &mut from_expr)?;
1956            let relation_type = RelationDesc::empty();
1957            let ecx = &ExprContext {
1958                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1959                name: "COPY FROM target",
1960                scope: &Scope::empty(),
1961                relation_type: relation_type.typ(),
1962                allow_aggregates: false,
1963                allow_subqueries: false,
1964                allow_parameters: false,
1965                allow_windows: false,
1966            };
1967            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1968
1969            match options.aws_connection {
1970                Some(conn_id) => {
1971                    let conn_id = CatalogItemId::from(conn_id);
1972
1973                    // Validate the connection type is one we expect.
1974                    let connection = match scx.get_item(&conn_id).connection()? {
1975                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1976                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1977                    };
1978
1979                    CopyFromSource::AwsS3 {
1980                        uri: from,
1981                        connection,
1982                        connection_id: conn_id,
1983                    }
1984                }
1985                None => CopyFromSource::Url(from),
1986            }
1987        }
1988        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1989    };
1990
1991    let params = match format {
1992        CopyFormat::Text => {
1993            only_available_with_csv(options.quote, "quote")?;
1994            only_available_with_csv(options.escape, "escape")?;
1995            only_available_with_csv(options.header, "HEADER")?;
1996            let delimiter =
1997                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1998            let null = match options.null {
1999                Some(null) => Cow::from(null),
2000                None => Cow::from("\\N"),
2001            };
2002            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
2003        }
2004        CopyFormat::Csv => {
2005            let quote = extract_byte_param_value(options.quote, "quote")?;
2006            let escape = extract_byte_param_value(options.escape, "escape")?;
2007            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2008            CopyFormatParams::Csv(
2009                CopyCsvFormatParams::try_new(
2010                    delimiter,
2011                    quote,
2012                    escape,
2013                    options.header,
2014                    options.null,
2015                )
2016                .map_err(|e| sql_err!("{}", e))?,
2017            )
2018        }
2019        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2020        CopyFormat::Parquet => CopyFormatParams::Parquet,
2021    };
2022
2023    let filter = match (options.files, options.pattern) {
2024        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2025        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2026        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2027        (None, None) => None,
2028    };
2029
2030    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2031        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2032    }
2033
2034    let table_name_string = table_name.full_name_str();
2035
2036    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2037
2038    let Some(mfp) = maybe_mfp else {
2039        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2040    };
2041
2042    Ok(Plan::CopyFrom(CopyFromPlan {
2043        target_id: id,
2044        target_name: table_name_string,
2045        source,
2046        columns,
2047        source_desc,
2048        mfp,
2049        params,
2050        filter,
2051    }))
2052}
2053
2054fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2055    match v {
2056        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2057        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2058        None => Ok(None),
2059    }
2060}
2061
2062generate_extracted_config!(
2063    CopyOption,
2064    (Format, String),
2065    (Delimiter, String),
2066    (Null, String),
2067    (Escape, String),
2068    (Quote, String),
2069    (Header, bool),
2070    (AwsConnection, with_options::Object),
2071    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2072    (Files, Vec<String>),
2073    (Pattern, String)
2074);
2075
2076pub fn plan_copy(
2077    scx: &StatementContext,
2078    CopyStatement {
2079        relation,
2080        direction,
2081        target,
2082        options,
2083    }: CopyStatement<Aug>,
2084) -> Result<Plan, PlanError> {
2085    let options = CopyOptionExtracted::try_from(options)?;
2086    // Parse any user-provided FORMAT option. If not provided, will default to
2087    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2088    let format = options
2089        .format
2090        .as_ref()
2091        .map(|format| match format.to_lowercase().as_str() {
2092            "text" => Ok(CopyFormat::Text),
2093            "csv" => Ok(CopyFormat::Csv),
2094            "binary" => Ok(CopyFormat::Binary),
2095            "parquet" => Ok(CopyFormat::Parquet),
2096            _ => sql_bail!("unknown FORMAT: {}", format),
2097        })
2098        .transpose()?;
2099
2100    match (&direction, &target) {
2101        (CopyDirection::To, CopyTarget::Stdout) => {
2102            if options.delimiter.is_some() {
2103                sql_bail!("COPY TO does not support DELIMITER option yet");
2104            }
2105            if options.quote.is_some() {
2106                sql_bail!("COPY TO does not support QUOTE option yet");
2107            }
2108            if options.null.is_some() {
2109                sql_bail!("COPY TO does not support NULL option yet");
2110            }
2111            match relation {
2112                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2113                CopyRelation::Select(stmt) => Ok(plan_select(
2114                    scx,
2115                    stmt,
2116                    &Params::empty(),
2117                    Some(format.unwrap_or(CopyFormat::Text)),
2118                )?),
2119                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2120                    scx,
2121                    stmt,
2122                    &Params::empty(),
2123                    Some(format.unwrap_or(CopyFormat::Text)),
2124                )?),
2125            }
2126        }
2127        (CopyDirection::From, target) => match relation {
2128            CopyRelation::Named { name, columns } => plan_copy_from(
2129                scx,
2130                target,
2131                name,
2132                columns,
2133                format.unwrap_or(CopyFormat::Text),
2134                options,
2135            ),
2136            _ => sql_bail!("COPY FROM {} not supported", target),
2137        },
2138        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2139            // System users are always allowed to use this feature, even when
2140            // the flag is disabled, so that we can dogfood for analytics in
2141            // production environments. The feature is stable enough that we're
2142            // not worried about it crashing.
2143            if !scx.catalog.active_role_id().is_system() {
2144                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2145            }
2146
2147            let format = match format {
2148                Some(inner) => inner,
2149                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2150            };
2151
2152            let stmt = match relation {
2153                CopyRelation::Named { name, columns } => {
2154                    if !columns.is_empty() {
2155                        // TODO(mouli): Add support for this
2156                        sql_bail!(
2157                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2158                        );
2159                    }
2160                    // Generate a synthetic SELECT query that just gets the table
2161                    let query = Query {
2162                        ctes: CteBlock::empty(),
2163                        body: SetExpr::Table(name),
2164                        order_by: vec![],
2165                        limit: None,
2166                        offset: None,
2167                    };
2168                    SelectStatement { query, as_of: None }
2169                }
2170                CopyRelation::Select(stmt) => {
2171                    if !stmt.query.order_by.is_empty() {
2172                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2173                    }
2174                    stmt
2175                }
2176                _ => sql_bail!("COPY {} {} not supported", direction, target),
2177            };
2178
2179            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2180            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2181        }
2182        _ => sql_bail!("COPY {} {} not supported", direction, target),
2183    }
2184}