Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::{ColumnOrder, RowSetFinishing};
21use mz_ore::num::NonNeg;
22use mz_ore::soft_panic_or_log;
23use mz_ore::str::separated;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
30use mz_sql_parser::ast::{
31    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
32    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
33    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
34    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
35    SetExpr, SubscribeOutput, UnresolvedItemName,
36};
37use mz_sql_parser::ident;
38use mz_storage_types::sinks::{
39    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
40    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
41};
42
43use crate::ast::display::{AstDisplay, escaped_string_literal};
44use crate::ast::{
45    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
46    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
47    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
48    UpdateStatement,
49};
50use crate::catalog::CatalogItemType;
51use crate::names::{Aug, ResolvedItemName};
52use crate::normalize;
53use crate::plan::query::{
54    ExprContext, QueryLifetime, negative_offset_error, offset_into_value, plan_as_of_or_up_to,
55    plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62    ExplainTimestampPlan, HirRelationExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF};
70
71// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
72// plans the whole query to determine its shape and parameter types,
73// and then throws away that plan. If we were smarter, we'd stash that
74// plan somewhere so we don't have to recompute it when the query is
75// executed.
76
77pub fn describe_insert(
78    scx: &StatementContext,
79    InsertStatement {
80        table_name,
81        columns,
82        source,
83        returning,
84    }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87    let desc = if returning.expr.is_empty() {
88        None
89    } else {
90        Some(returning.desc)
91    };
92    Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96    scx: &StatementContext,
97    InsertStatement {
98        table_name,
99        columns,
100        source,
101        returning,
102    }: InsertStatement<Aug>,
103    params: &Params,
104) -> Result<Plan, PlanError> {
105    let (id, mut expr, returning) =
106        query::plan_insert_query(scx, table_name, columns, source, returning)?;
107    expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
108    let returning = returning
109        .expr
110        .into_iter()
111        .map(|mut expr| {
112            expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
113            expr.lower_uncorrelated(scx.catalog.system_vars())
114        })
115        .collect::<Result<Vec<_>, _>>()?;
116
117    Ok(Plan::Insert(InsertPlan {
118        id,
119        values: expr,
120        returning,
121    }))
122}
123
124pub fn describe_delete(
125    scx: &StatementContext,
126    stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128    query::plan_delete_query(scx, stmt)?;
129    Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133    scx: &StatementContext,
134    stmt: DeleteStatement<Aug>,
135    params: &Params,
136) -> Result<Plan, PlanError> {
137    let rtw_plan = query::plan_delete_query(scx, stmt)?;
138    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142    scx: &StatementContext,
143    stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145    query::plan_update_query(scx, stmt)?;
146    Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150    scx: &StatementContext,
151    stmt: UpdateStatement<Aug>,
152    params: &Params,
153) -> Result<Plan, PlanError> {
154    let rtw_plan = query::plan_update_query(scx, stmt)?;
155    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159    scx: &StatementContext,
160    kind: MutationKind,
161    params: &Params,
162    query::ReadThenWritePlan {
163        id,
164        mut selection,
165        finishing,
166        assignments,
167    }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169    selection.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
170    let mut assignments_outer = BTreeMap::new();
171    for (idx, mut set) in assignments {
172        set.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
173        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
174        assignments_outer.insert(idx, set);
175    }
176
177    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178        id,
179        selection,
180        finishing,
181        assignments: assignments_outer,
182        kind,
183        returning: Vec::new(),
184    }))
185}
186
187pub fn describe_select(
188    scx: &StatementContext,
189    stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192        return Ok(StatementDesc::new(Some(desc)));
193    }
194
195    let query::PlannedRootQuery { desc, .. } =
196        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197    Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201    scx: &StatementContext,
202    select: SelectStatement<Aug>,
203    params: &Params,
204    copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207        return Ok(Plan::SideEffectingFunc(f));
208    }
209
210    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211    Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215    scx: &StatementContext,
216    select: SelectStatement<Aug>,
217    params: &Params,
218    copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220    let when = query::plan_as_of(scx, select.as_of.clone())?;
221    let lifetime = QueryLifetime::OneShot;
222    let query::PlannedRootQuery {
223        mut expr,
224        desc,
225        finishing,
226        scope: _,
227    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228    expr.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
229
230    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
231    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
232    // This involves binding parameters and evaluating each expression to a number.
233    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
234    // so this `limit` has to be a constant.)
235    let limit = match finishing.limit {
236        None => None,
237        Some(mut limit) => {
238            limit.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
239            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
240            let Some(limit) = limit.as_literal() else {
241                sql_bail!(
242                    "Top-level LIMIT must be a constant expression, got {}",
243                    limit
244                )
245            };
246            match limit {
247                Datum::Null => None,
248                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
249                _ => {
250                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
251                    sql_bail!("LIMIT must be a non-negative INT or NULL")
252                }
253            }
254        }
255    };
256    let offset = {
257        let mut offset = finishing.offset.clone();
258        offset.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
259        let offset = offset_into_value(offset.take())?;
260        offset.try_into().map_err(|_| {
261            // We already checked in bind_parameters_and_simplify_offset / offset_into_value.
262            soft_panic_or_log!("unexpectedly negative OFFSET");
263            negative_offset_error(offset)
264        })?
265    };
266
267    // Unmaterializable functions are evaluated based on various information (e.g., the catalog)
268    // during sequencing, without taking AS OF into account. This would be hard to fix, so for now
269    // we just disallow AS OF when there is an unmaterializable function in a query (except mz_now).
270    if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
271        && select.as_of.is_some()
272        && expr.contains_unmaterializable_except_temporal()
273    {
274        bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
275    }
276
277    let plan = SelectPlan {
278        source: expr,
279        when,
280        finishing: RowSetFinishing {
281            limit,
282            offset,
283            project: finishing.project,
284            order_by: finishing.order_by,
285        },
286        copy_to,
287        select: Some(Box::new(select)),
288    };
289
290    Ok((plan, desc))
291}
292
293pub fn describe_explain_plan(
294    scx: &StatementContext,
295    explain: ExplainPlanStatement<Aug>,
296) -> Result<StatementDesc, PlanError> {
297    let mut relation_desc = RelationDesc::builder();
298
299    match explain.stage() {
300        ExplainStage::RawPlan => {
301            let name = "Raw Plan";
302            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
303        }
304        ExplainStage::DecorrelatedPlan => {
305            let name = "Decorrelated Plan";
306            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
307        }
308        ExplainStage::LocalPlan => {
309            let name = "Locally Optimized Plan";
310            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
311        }
312        ExplainStage::GlobalPlan => {
313            let name = "Optimized Plan";
314            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
315        }
316        ExplainStage::PhysicalPlan => {
317            let name = "Physical Plan";
318            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
319        }
320        ExplainStage::Trace => {
321            relation_desc = relation_desc
322                .with_column("Time", SqlScalarType::UInt64.nullable(false))
323                .with_column("Path", SqlScalarType::String.nullable(false))
324                .with_column("Plan", SqlScalarType::String.nullable(false));
325        }
326        ExplainStage::PlanInsights => {
327            let name = "Plan Insights";
328            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329        }
330    };
331    let relation_desc = relation_desc.finish();
332
333    Ok(
334        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
335            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
336            _ => vec![],
337        }),
338    )
339}
340
341pub fn describe_explain_pushdown(
342    scx: &StatementContext,
343    statement: ExplainPushdownStatement<Aug>,
344) -> Result<StatementDesc, PlanError> {
345    let relation_desc = RelationDesc::builder()
346        .with_column("Source", SqlScalarType::String.nullable(false))
347        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
348        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
349        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
350        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
351        .finish();
352
353    Ok(
354        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
355            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
356            _ => vec![],
357        }),
358    )
359}
360
361pub fn describe_explain_analyze_object(
362    _scx: &StatementContext,
363    statement: ExplainAnalyzeObjectStatement<Aug>,
364) -> Result<StatementDesc, PlanError> {
365    if statement.as_sql {
366        let relation_desc = RelationDesc::builder()
367            .with_column("SQL", SqlScalarType::String.nullable(false))
368            .finish();
369        return Ok(StatementDesc::new(Some(relation_desc)));
370    }
371
372    match statement.properties {
373        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
374            properties,
375            skew,
376        }) => {
377            let mut relation_desc = RelationDesc::builder()
378                .with_column("operator", SqlScalarType::String.nullable(false));
379
380            if skew {
381                relation_desc =
382                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
383            }
384
385            let mut seen_properties = BTreeSet::new();
386            for property in properties {
387                // handle each property only once (belt and suspenders)
388                if !seen_properties.insert(property) {
389                    continue;
390                }
391
392                match property {
393                    ExplainAnalyzeComputationProperty::Memory if skew => {
394                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
395                        relation_desc = relation_desc
396                            .with_column("memory_ratio", numeric.clone())
397                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
398                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
399                            .with_column("total_memory", SqlScalarType::String.nullable(true))
400                            .with_column("records_ratio", numeric.clone())
401                            .with_column("worker_records", numeric.clone())
402                            .with_column("avg_records", numeric.clone())
403                            .with_column("total_records", numeric);
404                    }
405                    ExplainAnalyzeComputationProperty::Memory => {
406                        relation_desc = relation_desc
407                            .with_column("total_memory", SqlScalarType::String.nullable(true))
408                            .with_column(
409                                "total_records",
410                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
411                            );
412                    }
413                    ExplainAnalyzeComputationProperty::Cpu => {
414                        if skew {
415                            relation_desc = relation_desc
416                                .with_column(
417                                    "cpu_ratio",
418                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
419                                )
420                                .with_column(
421                                    "worker_elapsed",
422                                    SqlScalarType::Interval.nullable(true),
423                                )
424                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
425                        }
426                        relation_desc = relation_desc
427                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
428                    }
429                }
430            }
431
432            let relation_desc = relation_desc.finish();
433            Ok(StatementDesc::new(Some(relation_desc)))
434        }
435        ExplainAnalyzeProperty::Hints => {
436            let relation_desc = RelationDesc::builder()
437                .with_column("operator", SqlScalarType::String.nullable(true))
438                .with_column("levels", SqlScalarType::Int64.nullable(true))
439                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
440                .with_column("hint", SqlScalarType::Float64.nullable(true))
441                .with_column("savings", SqlScalarType::String.nullable(true))
442                .finish();
443            Ok(StatementDesc::new(Some(relation_desc)))
444        }
445    }
446}
447
448pub fn describe_explain_analyze_cluster(
449    _scx: &StatementContext,
450    statement: ExplainAnalyzeClusterStatement,
451) -> Result<StatementDesc, PlanError> {
452    if statement.as_sql {
453        let relation_desc = RelationDesc::builder()
454            .with_column("SQL", SqlScalarType::String.nullable(false))
455            .finish();
456        return Ok(StatementDesc::new(Some(relation_desc)));
457    }
458
459    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
460
461    let mut relation_desc = RelationDesc::builder()
462        .with_column("object", SqlScalarType::String.nullable(false))
463        .with_column("global_id", SqlScalarType::String.nullable(false));
464
465    if skew {
466        relation_desc =
467            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
468    }
469
470    let mut seen_properties = BTreeSet::new();
471    for property in properties {
472        // handle each property only once (belt and suspenders)
473        if !seen_properties.insert(property) {
474            continue;
475        }
476
477        match property {
478            ExplainAnalyzeComputationProperty::Memory if skew => {
479                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
480                relation_desc = relation_desc
481                    .with_column("max_operator_memory_ratio", numeric.clone())
482                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
483                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
484                    .with_column("total_memory", SqlScalarType::String.nullable(true))
485                    .with_column("max_operator_records_ratio", numeric.clone())
486                    .with_column("worker_records", numeric.clone())
487                    .with_column("avg_records", numeric.clone())
488                    .with_column("total_records", numeric);
489            }
490            ExplainAnalyzeComputationProperty::Memory => {
491                relation_desc = relation_desc
492                    .with_column("total_memory", SqlScalarType::String.nullable(true))
493                    .with_column(
494                        "total_records",
495                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
496                    );
497            }
498            ExplainAnalyzeComputationProperty::Cpu if skew => {
499                relation_desc = relation_desc
500                    .with_column(
501                        "max_operator_cpu_ratio",
502                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
503                    )
504                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
505                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
506                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
507            }
508            ExplainAnalyzeComputationProperty::Cpu => {
509                relation_desc = relation_desc
510                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
511            }
512        }
513    }
514
515    Ok(StatementDesc::new(Some(relation_desc.finish())))
516}
517
518pub fn describe_explain_timestamp(
519    scx: &StatementContext,
520    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522    let relation_desc = RelationDesc::builder()
523        .with_column("Timestamp", SqlScalarType::String.nullable(false))
524        .finish();
525
526    Ok(StatementDesc::new(Some(relation_desc))
527        .with_params(describe_select(scx, select)?.param_types))
528}
529
530pub fn describe_explain_schema(
531    _: &StatementContext,
532    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
533) -> Result<StatementDesc, PlanError> {
534    let relation_desc = RelationDesc::builder()
535        .with_column("Schema", SqlScalarType::String.nullable(false))
536        .finish();
537    Ok(StatementDesc::new(Some(relation_desc)))
538}
539
540// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
541// `bool`:
542// - When it's an override of a global feature flag, for example optimizer feature flags. In this
543//   case, we need not just false and true, but also None to say "take the value of the global
544//   flag".
545// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
546//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
547//   but otherwise we'd like to default to false.
548generate_extracted_config!(
549    ExplainPlanOption,
550    (Arity, Option<bool>, Default(None)),
551    (Cardinality, bool, Default(false)),
552    (ColumnNames, bool, Default(false)),
553    (FilterPushdown, Option<bool>, Default(None)),
554    (HumanizedExpressions, Option<bool>, Default(None)),
555    (JoinImplementations, bool, Default(false)),
556    (Keys, bool, Default(false)),
557    (LinearChains, bool, Default(false)),
558    (NoFastPath, bool, Default(false)),
559    (NonNegative, bool, Default(false)),
560    (NoNotices, bool, Default(false)),
561    (NodeIdentifiers, bool, Default(false)),
562    (Raw, bool, Default(false)),
563    (RawPlans, bool, Default(false)),
564    (RawSyntax, bool, Default(false)),
565    (Redacted, bool, Default(false)),
566    (SubtreeSize, bool, Default(false)),
567    (Timing, bool, Default(false)),
568    (Types, bool, Default(false)),
569    (Equivalences, bool, Default(false)),
570    (ReoptimizeImportedViews, Option<bool>, Default(None)),
571    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
572    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
573    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
574    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
575    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
576    (
577        EnableProjectionPushdownAfterRelationCse,
578        Option<bool>,
579        Default(None)
580    )
581);
582
583impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
584    type Error = PlanError;
585
586    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
587        // If `WITH(raw)` is specified, ensure that the config will be as
588        // representative for the original plan as possible.
589        if v.raw {
590            v.raw_plans = true;
591            v.raw_syntax = true;
592        }
593
594        // Certain config should default to be enabled in release builds running on
595        // staging or prod (where SOFT_ASSERTIONS are turned off).
596        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
597
598        Ok(ExplainConfig {
599            arity: v.arity.unwrap_or(enable_on_prod),
600            cardinality: v.cardinality,
601            column_names: v.column_names,
602            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
603            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
604            join_impls: v.join_implementations,
605            keys: v.keys,
606            linear_chains: !v.raw_plans && v.linear_chains,
607            no_fast_path: v.no_fast_path,
608            no_notices: v.no_notices,
609            node_ids: v.node_identifiers,
610            non_negative: v.non_negative,
611            raw_plans: v.raw_plans,
612            raw_syntax: v.raw_syntax,
613            verbose_syntax: false,
614            redacted: v.redacted,
615            subtree_size: v.subtree_size,
616            equivalences: v.equivalences,
617            timing: v.timing,
618            types: v.types,
619            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
620            features: OptimizerFeatureOverrides {
621                enable_eager_delta_joins: v.enable_eager_delta_joins,
622                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
623                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
624                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
625                enable_reduce_mfp_fusion: Default::default(),
626                enable_cardinality_estimates: Default::default(),
627                persist_fast_path_limit: Default::default(),
628                reoptimize_imported_views: v.reoptimize_imported_views,
629                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
630                enable_projection_pushdown_after_relation_cse: v
631                    .enable_projection_pushdown_after_relation_cse,
632                enable_less_reduce_in_eqprop: Default::default(),
633                enable_dequadratic_eqprop_map: Default::default(),
634                enable_eq_classes_withholding_errors: Default::default(),
635                enable_fast_path_plan_insights: Default::default(),
636                enable_cast_elimination: Default::default(),
637                enable_case_literal_transform: Default::default(),
638                enable_simplify_quantified_comparisons: Default::default(),
639                enable_coalesce_case_transform: Default::default(),
640                enable_will_distinct_propagation: Default::default(),
641            },
642        })
643    }
644}
645
646fn plan_explainee(
647    scx: &StatementContext,
648    explainee: Explainee<Aug>,
649    params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651    use crate::plan::ExplaineeStatement;
652
653    let is_replan = matches!(
654        explainee,
655        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656    );
657
658    let explainee = match explainee {
659        Explainee::View(name) | Explainee::ReplanView(name) => {
660            let item = scx.get_item_by_resolved_name(&name)?;
661            let item_type = item.item_type();
662            if item_type != CatalogItemType::View {
663                sql_bail!("Expected {name} to be a view, not a {item_type}");
664            }
665            match is_replan {
666                true => crate::plan::Explainee::ReplanView(item.id()),
667                false => crate::plan::Explainee::View(item.id()),
668            }
669        }
670        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671            let item = scx.get_item_by_resolved_name(&name)?;
672            let item_type = item.item_type();
673            if item_type != CatalogItemType::MaterializedView {
674                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675            }
676            match is_replan {
677                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678                false => crate::plan::Explainee::MaterializedView(item.id()),
679            }
680        }
681        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682            let item = scx.get_item_by_resolved_name(&name)?;
683            let item_type = item.item_type();
684            if item_type != CatalogItemType::Index {
685                sql_bail!("Expected {name} to be an index, not a {item_type}");
686            }
687            match is_replan {
688                true => crate::plan::Explainee::ReplanIndex(item.id()),
689                false => crate::plan::Explainee::Index(item.id()),
690            }
691        }
692        Explainee::Select(select, broken) => {
693            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695        }
696        Explainee::CreateView(mut stmt, broken) => {
697            if stmt.if_exists != IfExistsBehavior::Skip {
698                // If we don't force this parameter to Skip planning will
699                // fail for names that already exist in the catalog. This
700                // can happen even in `Replace` mode if the existing item
701                // has dependencies.
702                stmt.if_exists = IfExistsBehavior::Skip;
703            } else {
704                sql_bail!(
705                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
707                );
708            }
709
710            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711                sql_bail!("expected CreateViewPlan plan");
712            };
713
714            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715        }
716        Explainee::CreateMaterializedView(mut stmt, broken) => {
717            if stmt.if_exists != IfExistsBehavior::Skip {
718                // If we don't force this parameter to Skip planning will
719                // fail for names that already exist in the catalog. This
720                // can happen even in `Replace` mode if the existing item
721                // has dependencies.
722                stmt.if_exists = IfExistsBehavior::Skip;
723            } else {
724                sql_bail!(
725                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
727                );
728            }
729
730            let Plan::CreateMaterializedView(plan) =
731                ddl::plan_create_materialized_view(scx, *stmt)?
732            else {
733                sql_bail!("expected CreateMaterializedViewPlan plan");
734            };
735
736            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737                broken,
738                plan,
739            })
740        }
741        Explainee::CreateIndex(mut stmt, broken) => {
742            if !stmt.if_not_exists {
743                // If we don't force this parameter to true planning will
744                // fail for index items that already exist in the catalog.
745                stmt.if_not_exists = true;
746            } else {
747                sql_bail!(
748                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
750                );
751            }
752
753            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754                sql_bail!("expected CreateIndexPlan plan");
755            };
756
757            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758        }
759        Explainee::Subscribe(stmt, broken) => {
760            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761                sql_bail!("expected SubscribePlan");
762            };
763            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764        }
765    };
766
767    Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771    scx: &StatementContext,
772    explain: ExplainPlanStatement<Aug>,
773    params: &Params,
774) -> Result<Plan, PlanError> {
775    let (format, verbose_syntax) = match explain.format() {
776        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780    };
781    let stage = explain.stage();
782
783    // Plan ExplainConfig.
784    let mut config = {
785        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788            // If filtering is disabled, explain plans should not include pushdown info.
789            with_options.filter_pushdown = Some(false);
790        }
791
792        ExplainConfig::try_from(with_options)?
793    };
794    config.verbose_syntax = verbose_syntax;
795
796    let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798    Ok(Plan::ExplainPlan(ExplainPlanPlan {
799        stage,
800        format,
801        config,
802        explainee,
803    }))
804}
805
806pub fn plan_explain_schema(
807    scx: &StatementContext,
808    explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810    let ExplainSinkSchemaStatement {
811        schema_for,
812        // Parser limits to JSON.
813        format: _,
814        mut statement,
815    } = explain_schema;
816
817    // Force the sink's name to one that's guaranteed not to exist, by virtue of
818    // being a non-existent item in a schema under the system's control, so that
819    // `plan_create_sink` doesn't complain about the name already existing.
820    statement.name = Some(UnresolvedItemName::qualified(&[
821        ident!("mz_catalog"),
822        ident!("mz_explain_schema"),
823    ]));
824
825    crate::pure::purify_create_sink_avro_doc_on_options(
826        scx.catalog,
827        *statement.from.item_id(),
828        &mut statement.format,
829    )?;
830
831    match ddl::plan_create_sink(scx, statement)? {
832        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833            StorageSinkConnection::Kafka(KafkaSinkConnection {
834                format:
835                    KafkaSinkFormat {
836                        key_format,
837                        value_format:
838                            KafkaSinkFormatType::Avro {
839                                schema: value_schema,
840                                ..
841                            },
842                        ..
843                    },
844                ..
845            }) => {
846                let schema = match schema_for {
847                    ExplainSinkSchemaFor::Key => key_format
848                        .and_then(|f| match f {
849                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850                            _ => None,
851                        })
852                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853                    ExplainSinkSchemaFor::Value => value_schema,
854                };
855
856                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857                    sink_from: sink.from,
858                    json_schema: schema,
859                }))
860            }
861            _ => bail_unsupported!(
862                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863            ),
864        },
865        _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
866    }
867}
868
869pub fn plan_explain_pushdown(
870    scx: &StatementContext,
871    statement: ExplainPushdownStatement<Aug>,
872    params: &Params,
873) -> Result<Plan, PlanError> {
874    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875    let explainee = plan_explainee(scx, statement.explainee, params)?;
876    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880    scx: &StatementContext,
881    statement: ExplainAnalyzeObjectStatement<Aug>,
882    params: &Params,
883) -> Result<Plan, PlanError> {
884    let explainee_name = statement
885        .explainee
886        .name()
887        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888        .full_name_str();
889    let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891    let check_ownership = |item_id: &CatalogItemId, item_type: &str| -> Result<(), PlanError> {
892        if scx.catalog.restrict_to_user_objects() {
893            let item = scx.catalog.get_item(item_id);
894            if item.owner_id() != *scx.catalog.active_role_id() {
895                let full_name = scx.catalog.resolve_full_name(item.name());
896                return Err(sql_err!("must be owner of {item_type} {full_name}"));
897            }
898        }
899        Ok(())
900    };
901    match &explainee {
902        plan::Explainee::Index(item_id) => check_ownership(item_id, "INDEX")?,
903        plan::Explainee::MaterializedView(item_id) => {
904            check_ownership(item_id, "MATERIALIZED VIEW")?
905        }
906        _ => return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",)),
907    };
908
909    // generate SQL query
910
911    /* WITH {CTEs}
912       SELECT REPEAT(' ', nesting * 2) || operator AS operator
913             {columns}
914        FROM      mz_introspection.mz_lir_mapping mlm
915             JOIN {from} USING (lir_id)
916             JOIN mz_introspection.mz_mappable_objects mo
917               ON (mlm.global_id = mo.global_id)
918       WHERE     mo.name = {escaped explainee_name}
919             AND {predicates}
920       ORDER BY lir_id DESC
921    */
922    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
923    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
924    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
925    let mut predicates = vec![format!(
926        "mo.name = {}",
927        escaped_string_literal(&explainee_name)
928    )];
929    let mut order_by = vec!["mlm.lir_id DESC"];
930
931    match statement.properties {
932        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
933            properties,
934            skew,
935        }) => {
936            let mut worker_id = None;
937            let mut seen_properties = BTreeSet::new();
938            for property in properties {
939                // handle each property only once (belt and suspenders)
940                if !seen_properties.insert(property) {
941                    continue;
942                }
943
944                match property {
945                    ExplainAnalyzeComputationProperty::Memory => {
946                        ctes.push((
947                            "summary_memory",
948                            r#"
949  SELECT mlm.global_id AS global_id,
950         mlm.lir_id AS lir_id,
951         SUM(mas.size) AS total_memory,
952         SUM(mas.records) AS total_records,
953         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
954         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
955    FROM            mz_introspection.mz_lir_mapping mlm
956         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
957               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
958                 ON (mas.operator_id = valid_id)
959GROUP BY mlm.global_id, mlm.lir_id"#,
960                        ));
961                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
962
963                        if skew {
964                            ctes.push((
965                                "per_worker_memory",
966                                r#"
967  SELECT mlm.global_id AS global_id,
968         mlm.lir_id AS lir_id,
969         mas.worker_id AS worker_id,
970         SUM(mas.size) AS worker_memory,
971         SUM(mas.records) AS worker_records
972    FROM            mz_introspection.mz_lir_mapping mlm
973         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
974               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
975                 ON (mas.operator_id = valid_id)
976GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
977                            ));
978                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
979
980                            if let Some(worker_id) = worker_id {
981                                predicates.push(format!(
982                                    "(pwm.worker_id = {worker_id} OR pwm.worker_id IS NULL OR {worker_id} IS NULL)"
983                                ));
984                            } else {
985                                worker_id = Some("pwm.worker_id");
986                                columns.push("pwm.worker_id AS worker_id");
987                                order_by.push("worker_id");
988                            }
989
990                            columns.extend([
991                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
992                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
993                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
994                                "pg_size_pretty(sm.total_memory) AS total_memory",
995                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
996                                "pwm.worker_records AS worker_records",
997                                "sm.avg_records AS avg_records",
998                                "sm.total_records AS total_records",
999                            ]);
1000                        } else {
1001                            columns.extend([
1002                                "pg_size_pretty(sm.total_memory) AS total_memory",
1003                                "sm.total_records AS total_records",
1004                            ]);
1005                        }
1006                    }
1007                    ExplainAnalyzeComputationProperty::Cpu => {
1008                        ctes.push((
1009                            "summary_cpu",
1010                            r#"
1011  SELECT mlm.global_id AS global_id,
1012         mlm.lir_id AS lir_id,
1013         SUM(mse.elapsed_ns) AS total_ns,
1014         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1015    FROM            mz_introspection.mz_lir_mapping mlm
1016         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1017               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1018                 ON (mse.id = valid_id)
1019GROUP BY mlm.global_id, mlm.lir_id"#,
1020                        ));
1021                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1022
1023                        if skew {
1024                            ctes.push((
1025                                "per_worker_cpu",
1026                                r#"
1027  SELECT mlm.global_id AS global_id,
1028         mlm.lir_id AS lir_id,
1029         mse.worker_id AS worker_id,
1030         SUM(mse.elapsed_ns) AS worker_ns
1031    FROM            mz_introspection.mz_lir_mapping mlm
1032         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1033               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1034                 ON (mse.id = valid_id)
1035GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1036                            ));
1037                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1038
1039                            if let Some(worker_id) = worker_id {
1040                                predicates.push(format!(
1041                                    "(pwc.worker_id = {worker_id} OR pwc.worker_id IS NULL OR {worker_id} IS NULL)"
1042                                ));
1043                            } else {
1044                                worker_id = Some("pwc.worker_id");
1045                                columns.push("pwc.worker_id AS worker_id");
1046                                order_by.push("worker_id");
1047                            }
1048
1049                            columns.extend([
1050                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1051                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1052                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1053                            ]);
1054                        }
1055                        columns.push(
1056                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1057                        );
1058                    }
1059                }
1060            }
1061        }
1062        ExplainAnalyzeProperty::Hints => {
1063            columns.extend([
1064                "megsa.levels AS levels",
1065                "megsa.to_cut AS to_cut",
1066                "megsa.hint AS hint",
1067                "pg_size_pretty(megsa.savings) AS savings",
1068            ]);
1069            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1070            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1071             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1072        }
1073    }
1074
1075    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1076
1077    let ctes = if !ctes.is_empty() {
1078        format!(
1079            "WITH {}",
1080            separated(
1081                ",\n",
1082                ctes.iter()
1083                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1084            )
1085        )
1086    } else {
1087        String::new()
1088    };
1089    let columns = separated(", ", columns);
1090    let from = separated(" ", from);
1091    let predicates = separated(" AND ", predicates);
1092    let order_by = separated(", ", order_by);
1093    let query = format!(
1094        r#"{ctes}
1095SELECT {columns}
1096FROM {from}
1097WHERE {predicates}
1098ORDER BY {order_by}"#
1099    );
1100
1101    if statement.as_sql {
1102        let rows = vec![Row::pack_slice(&[Datum::String(
1103            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1104                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1105            })?,
1106        )])];
1107        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1108
1109        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1110    } else {
1111        let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1112        scx.record_sql_impl_ids(&resolved_ids);
1113        show_select.plan()
1114    }
1115}
1116
1117pub fn plan_explain_analyze_cluster(
1118    scx: &StatementContext,
1119    statement: ExplainAnalyzeClusterStatement,
1120    _params: &Params,
1121) -> Result<Plan, PlanError> {
1122    // object string
1123    // worker_id uint64        (if           skew)
1124    // memory_ratio numeric    (if memory && skew)
1125    // worker_memory string    (if memory && skew)
1126    // avg_memory string       (if memory && skew)
1127    // total_memory string     (if memory)
1128    // records_ratio numeric   (if memory && skew)
1129    // worker_records          (if memory && skew)
1130    // avg_records numeric     (if memory && skew)
1131    // total_records numeric   (if memory)
1132    // cpu_ratio numeric       (if cpu    && skew)
1133    // worker_elapsed interval (if cpu    && skew)
1134    // avg_elapsed interval    (if cpu    && skew)
1135    // total_elapsed interval  (if cpu)
1136
1137    /* WITH {CTEs}
1138       SELECT mo.name AS object
1139             {columns}
1140        FROM mz_introspection.mz_mappable_objects mo
1141             {from}
1142       WHERE {predicates}
1143       ORDER BY {order_by}, mo.name DESC
1144    */
1145    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1146    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1147    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1148    let mut predicates = vec![];
1149    let mut order_by = vec![];
1150
1151    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1152    let mut worker_id = None;
1153    let mut seen_properties = BTreeSet::new();
1154    for property in properties {
1155        // handle each property only once (belt and suspenders)
1156        if !seen_properties.insert(property) {
1157            continue;
1158        }
1159
1160        match property {
1161            ExplainAnalyzeComputationProperty::Memory => {
1162                if skew {
1163                    let mut set_worker_id = false;
1164                    if let Some(worker_id) = worker_id {
1165                        // join condition if we're showing skew for more than one property
1166                        predicates.push(format!(
1167                            "(om.worker_id = {worker_id} OR om.worker_id IS NULL OR {worker_id} IS NULL)"
1168                        ));
1169                    } else {
1170                        worker_id = Some("om.worker_id");
1171                        columns.push("om.worker_id AS worker_id");
1172                        set_worker_id = true; // we'll add ourselves to `order_by` later
1173                    };
1174
1175                    // computes the average memory per LIR operator (for per operator ratios)
1176                    ctes.push((
1177                    "per_operator_memory_summary",
1178                    r#"
1179SELECT mlm.global_id AS global_id,
1180       mlm.lir_id AS lir_id,
1181       SUM(mas.size) AS total_memory,
1182       SUM(mas.records) AS total_records,
1183       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1184       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1185FROM        mz_introspection.mz_lir_mapping mlm
1186 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1187       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1188         ON (mas.operator_id = valid_id)
1189GROUP BY mlm.global_id, mlm.lir_id"#,
1190                ));
1191
1192                    // computes the memory per worker in a per operator way
1193                    ctes.push((
1194                    "per_operator_memory_per_worker",
1195                    r#"
1196SELECT mlm.global_id AS global_id,
1197       mlm.lir_id AS lir_id,
1198       mas.worker_id AS worker_id,
1199       SUM(mas.size) AS worker_memory,
1200       SUM(mas.records) AS worker_records
1201FROM        mz_introspection.mz_lir_mapping mlm
1202 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1203       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1204         ON (mas.operator_id = valid_id)
1205GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1206                    ));
1207
1208                    // computes memory ratios per worker per operator
1209                    ctes.push((
1210                    "per_operator_memory_ratios",
1211                    r#"
1212SELECT pompw.global_id AS global_id,
1213       pompw.lir_id AS lir_id,
1214       pompw.worker_id AS worker_id,
1215       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1216       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1217  FROM      per_operator_memory_per_worker pompw
1218       JOIN per_operator_memory_summary poms
1219         USING (global_id, lir_id)
1220"#,
1221                    ));
1222
1223                    // summarizes each object, per worker
1224                    ctes.push((
1225                        "object_memory",
1226                        r#"
1227SELECT pompw.global_id AS global_id,
1228       pompw.worker_id AS worker_id,
1229       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1230       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1231       SUM(pompw.worker_memory) AS worker_memory,
1232       SUM(pompw.worker_records) AS worker_records
1233FROM        per_operator_memory_per_worker pompw
1234     JOIN   per_operator_memory_ratios pomr
1235     USING (global_id, worker_id, lir_id)
1236GROUP BY pompw.global_id, pompw.worker_id
1237"#,
1238                    ));
1239
1240                    // summarizes each worker
1241                    ctes.push(("object_average_memory", r#"
1242SELECT om.global_id AS global_id,
1243       SUM(om.worker_memory) AS total_memory,
1244       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1245       SUM(om.worker_records) AS total_records,
1246       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1247  FROM object_memory om
1248GROUP BY om.global_id"#));
1249
1250                    from.push("LEFT JOIN object_memory om USING (global_id)");
1251                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1252
1253                    columns.extend([
1254                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1255                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1256                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1257                        "pg_size_pretty(oam.total_memory) AS total_memory",
1258                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1259                        "om.worker_records AS worker_records",
1260                        "oam.avg_records AS avg_records",
1261                        "oam.total_records AS total_records",
1262                    ]);
1263
1264                    order_by.extend([
1265                        "max_operator_memory_ratio DESC NULLS LAST",
1266                        "max_operator_records_ratio DESC NULLS LAST",
1267                        "om.worker_memory DESC NULLS LAST",
1268                        "worker_records DESC NULLS LAST",
1269                    ]);
1270
1271                    if set_worker_id {
1272                        order_by.push("worker_id");
1273                    }
1274                } else {
1275                    // no skew, so just compute totals
1276                    ctes.push((
1277                        "per_operator_memory_totals",
1278                        r#"
1279    SELECT mlm.global_id AS global_id,
1280           mlm.lir_id AS lir_id,
1281           SUM(mas.size) AS total_memory,
1282           SUM(mas.records) AS total_records
1283    FROM        mz_introspection.mz_lir_mapping mlm
1284     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1285           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1286             ON (mas.operator_id = valid_id)
1287    GROUP BY mlm.global_id, mlm.lir_id"#,
1288                    ));
1289
1290                    ctes.push((
1291                        "object_memory_totals",
1292                        r#"
1293SELECT pomt.global_id AS global_id,
1294       SUM(pomt.total_memory) AS total_memory,
1295       SUM(pomt.total_records) AS total_records
1296FROM per_operator_memory_totals pomt
1297GROUP BY pomt.global_id
1298"#,
1299                    ));
1300
1301                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1302                    columns.extend([
1303                        "pg_size_pretty(omt.total_memory) AS total_memory",
1304                        "omt.total_records AS total_records",
1305                    ]);
1306                    order_by.extend([
1307                        "omt.total_memory DESC NULLS LAST",
1308                        "total_records DESC NULLS LAST",
1309                    ]);
1310                }
1311            }
1312            ExplainAnalyzeComputationProperty::Cpu => {
1313                if skew {
1314                    let mut set_worker_id = false;
1315                    if let Some(worker_id) = worker_id {
1316                        // join condition if we're showing skew for more than one property
1317                        predicates.push(format!(
1318                            "(oc.worker_id = {worker_id} OR oc.worker_id IS NULL OR {worker_id} IS NULL)"
1319                        ));
1320                    } else {
1321                        worker_id = Some("oc.worker_id");
1322                        columns.push("oc.worker_id AS worker_id");
1323                        set_worker_id = true; // we'll add ourselves to `order_by` later
1324                    };
1325
1326                    // computes the average memory per LIR operator (for per operator ratios)
1327                    ctes.push((
1328    "per_operator_cpu_summary",
1329    r#"
1330SELECT mlm.global_id AS global_id,
1331       mlm.lir_id AS lir_id,
1332       SUM(mse.elapsed_ns) AS total_ns,
1333       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1334FROM       mz_introspection.mz_lir_mapping mlm
1335CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1336      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1337        ON (mse.id = valid_id)
1338GROUP BY mlm.global_id, mlm.lir_id"#,
1339));
1340
1341                    // computes the CPU per worker in a per operator way
1342                    ctes.push((
1343                        "per_operator_cpu_per_worker",
1344                        r#"
1345SELECT mlm.global_id AS global_id,
1346       mlm.lir_id AS lir_id,
1347       mse.worker_id AS worker_id,
1348       SUM(mse.elapsed_ns) AS worker_ns
1349FROM       mz_introspection.mz_lir_mapping mlm
1350CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1351      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1352        ON (mse.id = valid_id)
1353GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1354                    ));
1355
1356                    // computes CPU ratios per worker per operator
1357                    ctes.push((
1358                        "per_operator_cpu_ratios",
1359                        r#"
1360SELECT pocpw.global_id AS global_id,
1361       pocpw.lir_id AS lir_id,
1362       pocpw.worker_id AS worker_id,
1363       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1364FROM      per_operator_cpu_per_worker pocpw
1365     JOIN per_operator_cpu_summary pocs
1366     USING (global_id, lir_id)
1367"#,
1368                    ));
1369
1370                    // summarizes each object, per worker
1371                    ctes.push((
1372                        "object_cpu",
1373                        r#"
1374SELECT pocpw.global_id AS global_id,
1375       pocpw.worker_id AS worker_id,
1376       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1377       SUM(pocpw.worker_ns) AS worker_ns
1378FROM      per_operator_cpu_per_worker pocpw
1379     JOIN per_operator_cpu_ratios pomr
1380     USING (global_id, worker_id, lir_id)
1381GROUP BY pocpw.global_id, pocpw.worker_id
1382"#,
1383                    ));
1384
1385                    // summarizes each worker
1386                    ctes.push((
1387                        "object_average_cpu",
1388                        r#"
1389SELECT oc.global_id AS global_id,
1390       SUM(oc.worker_ns) AS total_ns,
1391       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1392  FROM object_cpu oc
1393GROUP BY oc.global_id"#,));
1394
1395                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1396                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1397
1398                    columns.extend([
1399                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1400                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1401                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1402                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1403                    ]);
1404
1405                    order_by.extend([
1406                        "max_operator_cpu_ratio DESC NULLS LAST",
1407                        "worker_elapsed DESC NULLS LAST",
1408                    ]);
1409
1410                    if set_worker_id {
1411                        order_by.push("worker_id");
1412                    }
1413                } else {
1414                    // no skew, so just compute totals
1415                    ctes.push((
1416                        "per_operator_cpu_totals",
1417                        r#"
1418    SELECT mlm.global_id AS global_id,
1419           mlm.lir_id AS lir_id,
1420           SUM(mse.elapsed_ns) AS total_ns
1421    FROM        mz_introspection.mz_lir_mapping mlm
1422     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1423           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1424             ON (mse.id = valid_id)
1425    GROUP BY mlm.global_id, mlm.lir_id"#,
1426                    ));
1427
1428                    ctes.push((
1429                        "object_cpu_totals",
1430                        r#"
1431SELECT poct.global_id AS global_id,
1432       SUM(poct.total_ns) AS total_ns
1433FROM per_operator_cpu_totals poct
1434GROUP BY poct.global_id
1435"#,
1436                    ));
1437
1438                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1439                    columns
1440                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1441                    order_by.extend(["total_elapsed DESC NULLS LAST"]);
1442                }
1443            }
1444        }
1445    }
1446
1447    // generate SQL query text
1448    let ctes = if !ctes.is_empty() {
1449        format!(
1450            "WITH {}",
1451            separated(
1452                ",\n",
1453                ctes.iter()
1454                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1455            )
1456        )
1457    } else {
1458        String::new()
1459    };
1460    let columns = separated(", ", columns);
1461    let from = separated(" ", from);
1462    let predicates = if !predicates.is_empty() {
1463        format!("WHERE {}", separated(" AND ", predicates))
1464    } else {
1465        String::new()
1466    };
1467    // add mo.name last, to break ties only
1468    order_by.push("mo.name DESC");
1469    let order_by = separated(", ", order_by);
1470    let query = format!(
1471        r#"{ctes}
1472SELECT {columns}
1473FROM {from}
1474{predicates}
1475ORDER BY {order_by}"#
1476    );
1477
1478    if statement.as_sql {
1479        let rows = vec![Row::pack_slice(&[Datum::String(
1480            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1481                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1482            })?,
1483        )])];
1484        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1485
1486        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1487    } else {
1488        let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1489        scx.record_sql_impl_ids(&resolved_ids);
1490        show_select.plan()
1491    }
1492}
1493
1494pub fn plan_explain_timestamp(
1495    scx: &StatementContext,
1496    explain: ExplainTimestampStatement<Aug>,
1497) -> Result<Plan, PlanError> {
1498    let (format, _verbose_syntax) = match explain.format() {
1499        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1500        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1501        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1502        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1503    };
1504
1505    let raw_plan = {
1506        let query::PlannedRootQuery {
1507            expr: raw_plan,
1508            desc: _,
1509            finishing: _,
1510            scope: _,
1511        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1512        if raw_plan.contains_parameters()? {
1513            return Err(PlanError::ParameterNotAllowed(
1514                "EXPLAIN TIMESTAMP".to_string(),
1515            ));
1516        }
1517
1518        raw_plan
1519    };
1520    let when = query::plan_as_of(scx, explain.select.as_of)?;
1521
1522    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1523        format,
1524        raw_plan,
1525        when,
1526    }))
1527}
1528
1529generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1530
1531pub fn describe_subscribe(
1532    scx: &StatementContext,
1533    stmt: SubscribeStatement<Aug>,
1534) -> Result<StatementDesc, PlanError> {
1535    let relation_desc = match stmt.relation {
1536        SubscribeRelation::Name(name) => {
1537            let item = scx.get_item_by_resolved_name(&name)?;
1538            match item.relation_desc() {
1539                Some(desc) => desc.into_owned(),
1540                None => sql_bail!(
1541                    "'{}' cannot be subscribed to because it is a {}",
1542                    name.full_name_str(),
1543                    item.item_type(),
1544                ),
1545            }
1546        }
1547        SubscribeRelation::Query(query) => {
1548            let query::PlannedRootQuery { desc, .. } =
1549                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1550            desc
1551        }
1552    };
1553    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1554    let progress = progress.unwrap_or(false);
1555    let mut desc = RelationDesc::builder().with_column(
1556        "mz_timestamp",
1557        SqlScalarType::Numeric {
1558            max_scale: Some(NumericMaxScale::ZERO),
1559        }
1560        .nullable(false),
1561    );
1562    if progress {
1563        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1564    }
1565
1566    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1567    match stmt.output {
1568        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1569            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1570            for (name, mut ty) in relation_desc.into_iter() {
1571                if progress {
1572                    ty.nullable = true;
1573                }
1574                desc = desc.with_column(name, ty);
1575            }
1576        }
1577        SubscribeOutput::EnvelopeUpsert { key_columns }
1578        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1579            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1580            let key_columns = key_columns
1581                .into_iter()
1582                .map(normalize::column_name)
1583                .collect_vec();
1584            let mut before_values_desc = RelationDesc::builder();
1585            let mut after_values_desc = RelationDesc::builder();
1586
1587            // Add the key columns in the order that they're specified.
1588            for column_name in &key_columns {
1589                let mut column_ty = relation_desc
1590                    .get_by_name(column_name)
1591                    .map(|(_pos, ty)| ty.clone())
1592                    .ok_or_else(|| PlanError::UnknownColumn {
1593                        table: None,
1594                        column: column_name.clone(),
1595                        similar: Box::new([]),
1596                    })?;
1597                if progress {
1598                    column_ty.nullable = true;
1599                }
1600                desc = desc.with_column(column_name, column_ty);
1601            }
1602
1603            // Then add the remaining columns in the order from the original
1604            // table, filtering out the key columns since we added those above.
1605            for (mut name, mut ty) in relation_desc
1606                .into_iter()
1607                .filter(|(name, _ty)| !key_columns.contains(name))
1608            {
1609                ty.nullable = true;
1610                before_values_desc =
1611                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1612                if debezium {
1613                    name = format!("after_{}", name).into();
1614                }
1615                after_values_desc = after_values_desc.with_column(name, ty);
1616            }
1617
1618            if debezium {
1619                desc = desc.concat(before_values_desc);
1620            }
1621            desc = desc.concat(after_values_desc);
1622        }
1623    }
1624    Ok(StatementDesc::new(Some(desc.finish())))
1625}
1626
1627pub fn plan_subscribe(
1628    scx: &StatementContext,
1629    SubscribeStatement {
1630        relation,
1631        options,
1632        as_of,
1633        up_to,
1634        output,
1635    }: SubscribeStatement<Aug>,
1636    params: &Params,
1637    copy_to: Option<CopyFormat>,
1638) -> Result<Plan, PlanError> {
1639    let (from, desc, scope) = match relation {
1640        SubscribeRelation::Name(name) => {
1641            let item = scx.get_item_by_resolved_name(&name)?;
1642            let Some(desc) = item.relation_desc() else {
1643                sql_bail!(
1644                    "'{}' cannot be subscribed to because it is a {}",
1645                    name.full_name_str(),
1646                    item.item_type(),
1647                );
1648            };
1649            let item_name = match name {
1650                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1651                _ => None,
1652            };
1653            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1654            (
1655                SubscribeFrom::Id(item.global_id()),
1656                desc.into_owned(),
1657                scope,
1658            )
1659        }
1660        SubscribeRelation::Query(query) => {
1661            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1662            let query::PlannedRootQuery {
1663                mut expr,
1664                desc,
1665                finishing,
1666                scope,
1667            } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1668            expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::Subscribe, params)?;
1669            let query = query::PlannedRootQuery {
1670                expr,
1671                desc,
1672                finishing,
1673                scope,
1674            };
1675            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1676            // finishing should have already been turned into a `TopK` by
1677            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1678            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1679                &query.finishing,
1680                query.desc.arity()
1681            ));
1682            let desc = query.desc.clone();
1683            (
1684                SubscribeFrom::Query {
1685                    expr: query.expr,
1686                    desc: query.desc,
1687                },
1688                desc,
1689                query.scope,
1690            )
1691        }
1692    };
1693
1694    let when = query::plan_as_of(scx, as_of)?;
1695    let up_to = up_to
1696        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1697        .transpose()?;
1698
1699    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1700    let ecx = ExprContext {
1701        qcx: &qcx,
1702        name: "",
1703        scope: &scope,
1704        relation_type: desc.typ(),
1705        allow_aggregates: false,
1706        allow_subqueries: true,
1707        allow_parameters: true,
1708        allow_windows: false,
1709    };
1710
1711    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1712    let output = match output {
1713        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1714        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1715            let order_by = key_columns
1716                .iter()
1717                .map(|ident| OrderByExpr {
1718                    expr: Expr::Identifier(vec![ident.clone()]),
1719                    asc: None,
1720                    nulls_last: None,
1721                })
1722                .collect_vec();
1723            let (order_by, map_exprs) = query::plan_order_by_exprs(
1724                &ExprContext {
1725                    name: "ENVELOPE UPSERT KEY clause",
1726                    ..ecx
1727                },
1728                &order_by[..],
1729                &output_columns[..],
1730            )?;
1731            if !map_exprs.is_empty() {
1732                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1733            }
1734            check_distinct_key_columns(&order_by, &output_columns)?;
1735            plan::SubscribeOutput::EnvelopeUpsert {
1736                order_by_keys: order_by,
1737            }
1738        }
1739        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1740            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1741            let order_by = key_columns
1742                .iter()
1743                .map(|ident| OrderByExpr {
1744                    expr: Expr::Identifier(vec![ident.clone()]),
1745                    asc: None,
1746                    nulls_last: None,
1747                })
1748                .collect_vec();
1749            let (order_by, map_exprs) = query::plan_order_by_exprs(
1750                &ExprContext {
1751                    name: "ENVELOPE DEBEZIUM KEY clause",
1752                    ..ecx
1753                },
1754                &order_by[..],
1755                &output_columns[..],
1756            )?;
1757            if !map_exprs.is_empty() {
1758                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1759            }
1760            check_distinct_key_columns(&order_by, &output_columns)?;
1761            plan::SubscribeOutput::EnvelopeDebezium {
1762                order_by_keys: order_by,
1763            }
1764        }
1765        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1766            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1767            let mz_diff = "mz_diff".into();
1768            let output_columns = std::iter::once((0, &mz_diff))
1769                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1770                .collect_vec();
1771            match query::plan_order_by_exprs(
1772                &ExprContext {
1773                    name: "WITHIN TIMESTAMP ORDER BY clause",
1774                    ..ecx
1775                },
1776                &order_by[..],
1777                &output_columns[..],
1778            ) {
1779                Err(PlanError::UnknownColumn {
1780                    table: None,
1781                    column,
1782                    similar: _,
1783                }) if &column == &mz_diff => {
1784                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1785                    // it looks like an unknown column. Instead, return a better error
1786                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1787                }
1788                Err(e) => return Err(e),
1789                Ok((order_by, map_exprs)) => {
1790                    if !map_exprs.is_empty() {
1791                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1792                    }
1793
1794                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1795                }
1796            }
1797        }
1798    };
1799
1800    let SubscribeOptionExtracted {
1801        progress, snapshot, ..
1802    } = options.try_into()?;
1803    Ok(Plan::Subscribe(SubscribePlan {
1804        from,
1805        when,
1806        up_to,
1807        with_snapshot: snapshot.unwrap_or(true),
1808        copy_to,
1809        emit_progress: progress.unwrap_or(false),
1810        output,
1811    }))
1812}
1813
1814/// Ensures each `ColumnOrder` in `order_by` references a distinct column,
1815/// returning `DuplicateKeyColumnInSubscribeEnvelope` on the first repeat.
1816fn check_distinct_key_columns(
1817    order_by: &[ColumnOrder],
1818    output_columns: &[(usize, &mz_repr::ColumnName)],
1819) -> Result<(), PlanError> {
1820    let mut seen = BTreeSet::new();
1821    for co in order_by {
1822        if !seen.insert(co.column) {
1823            return Err(PlanError::DuplicateKeyColumnInSubscribeEnvelope {
1824                column_name: output_columns[co.column].1.to_string(),
1825            });
1826        }
1827    }
1828    Ok(())
1829}
1830
1831pub fn describe_copy_from_table(
1832    scx: &StatementContext,
1833    table_name: <Aug as AstInfo>::ItemName,
1834    columns: Vec<Ident>,
1835) -> Result<StatementDesc, PlanError> {
1836    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1837    Ok(StatementDesc::new(Some(desc)))
1838}
1839
1840pub fn describe_copy_item(
1841    scx: &StatementContext,
1842    object_name: <Aug as AstInfo>::ItemName,
1843    columns: Vec<Ident>,
1844) -> Result<StatementDesc, PlanError> {
1845    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1846    Ok(StatementDesc::new(Some(desc)))
1847}
1848
1849pub fn describe_copy(
1850    scx: &StatementContext,
1851    CopyStatement {
1852        relation,
1853        direction,
1854        ..
1855    }: CopyStatement<Aug>,
1856) -> Result<StatementDesc, PlanError> {
1857    Ok(match (relation, direction) {
1858        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1859            describe_copy_item(scx, name, columns)?
1860        }
1861        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1862            describe_copy_from_table(scx, name, columns)?
1863        }
1864        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1865        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1866    }
1867    .with_is_copy())
1868}
1869
1870fn plan_copy_to_expr(
1871    scx: &StatementContext,
1872    select_plan: SelectPlan,
1873    desc: RelationDesc,
1874    to: &Expr<Aug>,
1875    format: CopyFormat,
1876    options: CopyOptionExtracted,
1877) -> Result<Plan, PlanError> {
1878    let conn_id = match options.aws_connection {
1879        Some(conn_id) => CatalogItemId::from(conn_id),
1880        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1881    };
1882    let connection = scx.get_item(&conn_id).connection()?;
1883
1884    match connection {
1885        mz_storage_types::connections::Connection::Aws(_) => {}
1886        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1887    }
1888
1889    let format = match format {
1890        CopyFormat::Csv => {
1891            let quote = extract_byte_param_value(options.quote, "quote")?;
1892            let escape = extract_byte_param_value(options.escape, "escape")?;
1893            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1894            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1895                CopyCsvFormatParams::try_new(
1896                    delimiter,
1897                    quote,
1898                    escape,
1899                    options.header,
1900                    options.null,
1901                )
1902                .map_err(|e| sql_err!("{}", e))?,
1903            ))
1904        }
1905        CopyFormat::Parquet => {
1906            // Validate that the output desc can be formatted as parquet.
1907            // COPY TO does not apply any type overrides, so pass `|_| None`.
1908            ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1909                .map_err(|e| sql_err!("{}", e))?;
1910            S3SinkFormat::Parquet
1911        }
1912        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1913        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1914    };
1915
1916    // Converting the to expr to a HirScalarExpr
1917    let mut to_expr = to.clone();
1918    transform_ast::transform(scx, &mut to_expr)?;
1919    let relation_type = RelationDesc::empty();
1920    let ecx = &ExprContext {
1921        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1922        name: "COPY TO target",
1923        scope: &Scope::empty(),
1924        relation_type: relation_type.typ(),
1925        allow_aggregates: false,
1926        allow_subqueries: false,
1927        allow_parameters: false,
1928        allow_windows: false,
1929    };
1930
1931    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1932
1933    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1934        sql_bail!(
1935            "MAX FILE SIZE cannot be less than {}",
1936            MIN_S3_SINK_FILE_SIZE
1937        );
1938    }
1939    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1940        sql_bail!(
1941            "MAX FILE SIZE cannot be greater than {}",
1942            MAX_S3_SINK_FILE_SIZE
1943        );
1944    }
1945
1946    Ok(Plan::CopyTo(CopyToPlan {
1947        select_plan,
1948        desc,
1949        to,
1950        connection: connection.to_owned(),
1951        connection_id: conn_id,
1952        format,
1953        max_file_size: options.max_file_size.as_bytes(),
1954    }))
1955}
1956
1957fn plan_copy_from(
1958    scx: &StatementContext,
1959    target: &CopyTarget<Aug>,
1960    table_name: ResolvedItemName,
1961    columns: Vec<Ident>,
1962    format: Option<CopyFormat>,
1963    options: CopyOptionExtracted,
1964) -> Result<Plan, PlanError> {
1965    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1966        match option {
1967            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1968            None => Ok(()),
1969        }
1970    }
1971
1972    let source = match target {
1973        CopyTarget::Stdin => CopyFromSource::Stdin,
1974        CopyTarget::Expr(from) => {
1975            // Converting the expr to an HirScalarExpr
1976            let mut from_expr = from.clone();
1977            transform_ast::transform(scx, &mut from_expr)?;
1978            let relation_type = RelationDesc::empty();
1979            let ecx = &ExprContext {
1980                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1981                name: "COPY FROM target",
1982                scope: &Scope::empty(),
1983                relation_type: relation_type.typ(),
1984                allow_aggregates: false,
1985                allow_subqueries: false,
1986                allow_parameters: false,
1987                allow_windows: false,
1988            };
1989            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1990
1991            match options.aws_connection {
1992                Some(conn_id) => {
1993                    let conn_id = CatalogItemId::from(conn_id);
1994
1995                    // Validate the connection type is one we expect.
1996                    let connection = match scx.get_item(&conn_id).connection()? {
1997                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1998                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1999                    };
2000
2001                    CopyFromSource::AwsS3 {
2002                        uri: from,
2003                        connection,
2004                        connection_id: conn_id,
2005                    }
2006                }
2007                None => CopyFromSource::Url(from),
2008            }
2009        }
2010        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
2011    };
2012
2013    // COPY FROM a URL or S3 bucket only supports CSV and Parquet. Unlike COPY
2014    // FROM STDIN there's no sensible default format, so one must be specified
2015    // explicitly. Reject unsupported formats here in planning; the coordinator
2016    // relies on this and would otherwise soft-panic.
2017    let format = match &source {
2018        CopyFromSource::Stdin => format.unwrap_or(CopyFormat::Text),
2019        CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => match format {
2020            None => sql_bail!("COPY FROM <expr> requires a FORMAT option"),
2021            Some(CopyFormat::Text) => bail_unsupported!("FORMAT TEXT"),
2022            Some(CopyFormat::Binary) => bail_unsupported!("FORMAT BINARY"),
2023            Some(format @ (CopyFormat::Csv | CopyFormat::Parquet)) => format,
2024        },
2025    };
2026
2027    let params = match format {
2028        CopyFormat::Text => {
2029            only_available_with_csv(options.quote, "quote")?;
2030            only_available_with_csv(options.escape, "escape")?;
2031            only_available_with_csv(options.header, "HEADER")?;
2032            let delimiter =
2033                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
2034            let null = match options.null {
2035                Some(null) => Cow::from(null),
2036                None => Cow::from("\\N"),
2037            };
2038            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
2039        }
2040        CopyFormat::Csv => {
2041            let quote = extract_byte_param_value(options.quote, "quote")?;
2042            let escape = extract_byte_param_value(options.escape, "escape")?;
2043            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2044            CopyFormatParams::Csv(
2045                CopyCsvFormatParams::try_new(
2046                    delimiter,
2047                    quote,
2048                    escape,
2049                    options.header,
2050                    options.null,
2051                )
2052                .map_err(|e| sql_err!("{}", e))?,
2053            )
2054        }
2055        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2056        CopyFormat::Parquet => CopyFormatParams::Parquet,
2057    };
2058
2059    let filter = match (options.files, options.pattern) {
2060        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2061        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2062        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2063        (None, None) => None,
2064    };
2065
2066    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2067        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2068    }
2069
2070    let table_name_string = table_name.full_name_str();
2071
2072    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2073
2074    let Some(mfp) = maybe_mfp else {
2075        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2076    };
2077
2078    Ok(Plan::CopyFrom(CopyFromPlan {
2079        target_id: id,
2080        target_name: table_name_string,
2081        source,
2082        columns,
2083        source_desc,
2084        mfp,
2085        params,
2086        filter,
2087    }))
2088}
2089
2090fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2091    match v {
2092        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2093        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2094        None => Ok(None),
2095    }
2096}
2097
2098generate_extracted_config!(
2099    CopyOption,
2100    (Format, String),
2101    (Delimiter, String),
2102    (Null, String),
2103    (Escape, String),
2104    (Quote, String),
2105    (Header, bool),
2106    (AwsConnection, with_options::Object),
2107    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2108    (Files, Vec<String>),
2109    (Pattern, String)
2110);
2111
2112pub fn plan_copy(
2113    scx: &StatementContext,
2114    CopyStatement {
2115        relation,
2116        direction,
2117        target,
2118        options,
2119    }: CopyStatement<Aug>,
2120) -> Result<Plan, PlanError> {
2121    let options = CopyOptionExtracted::try_from(options)?;
2122    // Parse any user-provided FORMAT option. If not provided, will default to
2123    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2124    let format = options
2125        .format
2126        .as_ref()
2127        .map(|format| match format.to_lowercase().as_str() {
2128            "text" => Ok(CopyFormat::Text),
2129            "csv" => Ok(CopyFormat::Csv),
2130            "binary" => Ok(CopyFormat::Binary),
2131            "parquet" => Ok(CopyFormat::Parquet),
2132            _ => sql_bail!("unknown FORMAT: {}", format),
2133        })
2134        .transpose()?;
2135
2136    match (&direction, &target) {
2137        (CopyDirection::To, CopyTarget::Stdout) => {
2138            if options.delimiter.is_some() {
2139                sql_bail!("COPY TO does not support DELIMITER option yet");
2140            }
2141            if options.quote.is_some() {
2142                sql_bail!("COPY TO does not support QUOTE option yet");
2143            }
2144            if options.escape.is_some() {
2145                sql_bail!("COPY TO does not support ESCAPE option yet");
2146            }
2147            if options.null.is_some() {
2148                sql_bail!("COPY TO does not support NULL option yet");
2149            }
2150            // `HEADER false` is the default and already honored; only an
2151            // enabled header is unimplemented. Silently accepting it would
2152            // make clients strip the first data row as a presumed header.
2153            if options.header == Some(true) {
2154                sql_bail!("COPY TO does not support HEADER option yet");
2155            }
2156            match relation {
2157                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2158                CopyRelation::Select(stmt) => Ok(plan_select(
2159                    scx,
2160                    stmt,
2161                    &Params::empty(),
2162                    Some(format.unwrap_or(CopyFormat::Text)),
2163                )?),
2164                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2165                    scx,
2166                    stmt,
2167                    &Params::empty(),
2168                    Some(format.unwrap_or(CopyFormat::Text)),
2169                )?),
2170            }
2171        }
2172        (CopyDirection::From, target) => match relation {
2173            CopyRelation::Named { name, columns } => {
2174                plan_copy_from(scx, target, name, columns, format, options)
2175            }
2176            _ => sql_bail!("COPY FROM {} not supported", target),
2177        },
2178        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2179            let format = match format {
2180                Some(inner) => inner,
2181                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2182            };
2183
2184            let stmt = match relation {
2185                CopyRelation::Named { name, columns } => {
2186                    if !columns.is_empty() {
2187                        // TODO(mouli): Add support for this
2188                        sql_bail!(
2189                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2190                        );
2191                    }
2192                    // Generate a synthetic SELECT query that just gets the table
2193                    let query = Query {
2194                        ctes: CteBlock::empty(),
2195                        body: SetExpr::Table(name),
2196                        order_by: vec![],
2197                        limit: None,
2198                        offset: None,
2199                    };
2200                    SelectStatement { query, as_of: None }
2201                }
2202                CopyRelation::Select(stmt) => {
2203                    if !stmt.query.order_by.is_empty() {
2204                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2205                    }
2206                    stmt
2207                }
2208                CopyRelation::Subscribe(_) => {
2209                    sql_bail!("COPY {} {} not supported", direction, target)
2210                }
2211            };
2212
2213            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2214            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2215        }
2216        _ => sql_bail!("COPY {} {} not supported", direction, target),
2217    }
2218}