mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeComputationProperty, ExplainAnalyzeProperty, ExplainAnalyzeStatement,
34    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36    SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49    UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
70
71// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
72// plans the whole query to determine its shape and parameter types,
73// and then throws away that plan. If we were smarter, we'd stash that
74// plan somewhere so we don't have to recompute it when the query is
75// executed.
76
77pub fn describe_insert(
78    scx: &StatementContext,
79    InsertStatement {
80        table_name,
81        columns,
82        source,
83        returning,
84    }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87    let desc = if returning.expr.is_empty() {
88        None
89    } else {
90        Some(returning.desc)
91    };
92    Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96    scx: &StatementContext,
97    InsertStatement {
98        table_name,
99        columns,
100        source,
101        returning,
102    }: InsertStatement<Aug>,
103    params: &Params,
104) -> Result<Plan, PlanError> {
105    let (id, mut expr, returning) =
106        query::plan_insert_query(scx, table_name, columns, source, returning)?;
107    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
108    let returning = returning
109        .expr
110        .into_iter()
111        .map(|mut expr| {
112            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
113            expr.lower_uncorrelated()
114        })
115        .collect::<Result<Vec<_>, _>>()?;
116
117    Ok(Plan::Insert(InsertPlan {
118        id,
119        values: expr,
120        returning,
121    }))
122}
123
124pub fn describe_delete(
125    scx: &StatementContext,
126    stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128    query::plan_delete_query(scx, stmt)?;
129    Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133    scx: &StatementContext,
134    stmt: DeleteStatement<Aug>,
135    params: &Params,
136) -> Result<Plan, PlanError> {
137    let rtw_plan = query::plan_delete_query(scx, stmt)?;
138    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142    scx: &StatementContext,
143    stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145    query::plan_update_query(scx, stmt)?;
146    Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150    scx: &StatementContext,
151    stmt: UpdateStatement<Aug>,
152    params: &Params,
153) -> Result<Plan, PlanError> {
154    let rtw_plan = query::plan_update_query(scx, stmt)?;
155    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159    scx: &StatementContext,
160    kind: MutationKind,
161    params: &Params,
162    query::ReadThenWritePlan {
163        id,
164        mut selection,
165        finishing,
166        assignments,
167    }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
170    let mut assignments_outer = BTreeMap::new();
171    for (idx, mut set) in assignments {
172        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
173        let set = set.lower_uncorrelated()?;
174        assignments_outer.insert(idx, set);
175    }
176
177    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178        id,
179        selection,
180        finishing,
181        assignments: assignments_outer,
182        kind,
183        returning: Vec::new(),
184    }))
185}
186
187pub fn describe_select(
188    scx: &StatementContext,
189    stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192        return Ok(StatementDesc::new(Some(desc)));
193    }
194
195    let query::PlannedRootQuery { desc, .. } =
196        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197    Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201    scx: &StatementContext,
202    select: SelectStatement<Aug>,
203    params: &Params,
204    copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207        return Ok(Plan::SideEffectingFunc(f));
208    }
209
210    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211    Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215    scx: &StatementContext,
216    select: SelectStatement<Aug>,
217    params: &Params,
218    copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220    let when = query::plan_as_of(scx, select.as_of.clone())?;
221    let lifetime = QueryLifetime::OneShot;
222    let query::PlannedRootQuery {
223        mut expr,
224        desc,
225        finishing,
226        scope: _,
227    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228    expr.bind_parameters(scx, lifetime, params)?;
229
230    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
231    // Let's check this and simplify them to literals.
232    expr.try_visit_mut_pre(&mut |expr| {
233        if let HirRelationExpr::TopK { offset, .. } = expr {
234            let offset_value = offset_into_value(offset.take())?;
235            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
236        }
237        Ok::<(), PlanError>(())
238    })?;
239    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
240    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
241    // so later.)
242
243    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
244    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
245    // This involves binding parameters and evaluating each expression to a number.
246    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
247    // so this `limit` has to be a constant.)
248    let limit = match finishing.limit {
249        None => None,
250        Some(mut limit) => {
251            limit.bind_parameters(scx, lifetime, params)?;
252            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
253            let Some(limit) = limit.as_literal() else {
254                sql_bail!(
255                    "Top-level LIMIT must be a constant expression, got {}",
256                    limit
257                )
258            };
259            match limit {
260                Datum::Null => None,
261                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
262                _ => {
263                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
264                    sql_bail!("LIMIT must be a non-negative INT or NULL")
265                }
266            }
267        }
268    };
269    let offset = {
270        let mut offset = finishing.offset.clone();
271        offset.bind_parameters(scx, lifetime, params)?;
272        let offset = offset_into_value(offset.take())?;
273        offset
274            .try_into()
275            .expect("checked in offset_into_value that it is not negative")
276    };
277
278    let plan = SelectPlan {
279        source: expr,
280        when,
281        finishing: RowSetFinishing {
282            limit,
283            offset,
284            project: finishing.project,
285            order_by: finishing.order_by,
286        },
287        copy_to,
288        select: Some(Box::new(select)),
289    };
290
291    Ok((plan, desc))
292}
293
294pub fn describe_explain_plan(
295    scx: &StatementContext,
296    explain: ExplainPlanStatement<Aug>,
297) -> Result<StatementDesc, PlanError> {
298    let mut relation_desc = RelationDesc::builder();
299
300    match explain.stage() {
301        ExplainStage::RawPlan => {
302            let name = "Raw Plan";
303            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
304        }
305        ExplainStage::DecorrelatedPlan => {
306            let name = "Decorrelated Plan";
307            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
308        }
309        ExplainStage::LocalPlan => {
310            let name = "Locally Optimized Plan";
311            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
312        }
313        ExplainStage::GlobalPlan => {
314            let name = "Optimized Plan";
315            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316        }
317        ExplainStage::PhysicalPlan => {
318            let name = "Physical Plan";
319            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320        }
321        ExplainStage::Trace => {
322            relation_desc = relation_desc
323                .with_column("Time", SqlScalarType::UInt64.nullable(false))
324                .with_column("Path", SqlScalarType::String.nullable(false))
325                .with_column("Plan", SqlScalarType::String.nullable(false));
326        }
327        ExplainStage::PlanInsights => {
328            let name = "Plan Insights";
329            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
330        }
331    };
332    let relation_desc = relation_desc.finish();
333
334    Ok(
335        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
336            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
337            _ => vec![],
338        }),
339    )
340}
341
342pub fn describe_explain_pushdown(
343    scx: &StatementContext,
344    statement: ExplainPushdownStatement<Aug>,
345) -> Result<StatementDesc, PlanError> {
346    let relation_desc = RelationDesc::builder()
347        .with_column("Source", SqlScalarType::String.nullable(false))
348        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
349        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
350        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
351        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
352        .finish();
353
354    Ok(
355        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
356            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
357            _ => vec![],
358        }),
359    )
360}
361
362pub fn describe_explain_analyze(
363    _scx: &StatementContext,
364    statement: ExplainAnalyzeStatement<Aug>,
365) -> Result<StatementDesc, PlanError> {
366    if statement.as_sql {
367        let relation_desc = RelationDesc::builder()
368            .with_column("SQL", SqlScalarType::String.nullable(false))
369            .finish();
370        return Ok(StatementDesc::new(Some(relation_desc)));
371    }
372
373    match statement.properties {
374        ExplainAnalyzeProperty::Computation { properties, skew } => {
375            let mut relation_desc = RelationDesc::builder()
376                .with_column("operator", SqlScalarType::String.nullable(false));
377
378            if skew {
379                relation_desc =
380                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
381            }
382
383            let mut seen_properties = BTreeSet::new();
384            for property in properties {
385                // handle each property only once (belt and suspenders)
386                if !seen_properties.insert(property) {
387                    continue;
388                }
389
390                match property {
391                    ExplainAnalyzeComputationProperty::Memory if skew => {
392                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
393                        relation_desc = relation_desc
394                            .with_column("memory_ratio", numeric.clone())
395                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
396                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
397                            .with_column("total_memory", SqlScalarType::String.nullable(true))
398                            .with_column("records_ratio", numeric.clone())
399                            .with_column("worker_records", numeric.clone())
400                            .with_column("avg_records", numeric.clone())
401                            .with_column("total_records", numeric);
402                    }
403                    ExplainAnalyzeComputationProperty::Memory => {
404                        relation_desc = relation_desc
405                            .with_column("total_memory", SqlScalarType::String.nullable(true))
406                            .with_column(
407                                "total_records",
408                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
409                            );
410                    }
411                    ExplainAnalyzeComputationProperty::Cpu => {
412                        if skew {
413                            relation_desc = relation_desc
414                                .with_column(
415                                    "cpu_ratio",
416                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
417                                )
418                                .with_column(
419                                    "worker_elapsed",
420                                    SqlScalarType::Interval.nullable(true),
421                                )
422                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
423                        }
424                        relation_desc = relation_desc
425                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
426                    }
427                }
428            }
429
430            let relation_desc = relation_desc.finish();
431            Ok(StatementDesc::new(Some(relation_desc)))
432        }
433        ExplainAnalyzeProperty::Hints => {
434            let relation_desc = RelationDesc::builder()
435                .with_column("operator", SqlScalarType::String.nullable(true))
436                .with_column("levels", SqlScalarType::Int64.nullable(true))
437                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
438                .with_column("hint", SqlScalarType::Float64.nullable(true))
439                .with_column("savings", SqlScalarType::String.nullable(true))
440                .finish();
441            Ok(StatementDesc::new(Some(relation_desc)))
442        }
443    }
444}
445
446pub fn describe_explain_timestamp(
447    scx: &StatementContext,
448    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
449) -> Result<StatementDesc, PlanError> {
450    let relation_desc = RelationDesc::builder()
451        .with_column("Timestamp", SqlScalarType::String.nullable(false))
452        .finish();
453
454    Ok(StatementDesc::new(Some(relation_desc))
455        .with_params(describe_select(scx, select)?.param_types))
456}
457
458pub fn describe_explain_schema(
459    _: &StatementContext,
460    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
461) -> Result<StatementDesc, PlanError> {
462    let relation_desc = RelationDesc::builder()
463        .with_column("Schema", SqlScalarType::String.nullable(false))
464        .finish();
465    Ok(StatementDesc::new(Some(relation_desc)))
466}
467
468// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
469// `bool`:
470// - When it's an override of a global feature flag, for example optimizer feature flags. In this
471//   case, we need not just false and true, but also None to say "take the value of the global
472//   flag".
473// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
474//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
475//   but otherwise we'd like to default to false.
476generate_extracted_config!(
477    ExplainPlanOption,
478    (Arity, Option<bool>, Default(None)),
479    (Cardinality, bool, Default(false)),
480    (ColumnNames, bool, Default(false)),
481    (FilterPushdown, Option<bool>, Default(None)),
482    (HumanizedExpressions, Option<bool>, Default(None)),
483    (JoinImplementations, bool, Default(false)),
484    (Keys, bool, Default(false)),
485    (LinearChains, bool, Default(false)),
486    (NoFastPath, bool, Default(false)),
487    (NonNegative, bool, Default(false)),
488    (NoNotices, bool, Default(false)),
489    (NodeIdentifiers, bool, Default(false)),
490    (Raw, bool, Default(false)),
491    (RawPlans, bool, Default(false)),
492    (RawSyntax, bool, Default(false)),
493    (Redacted, bool, Default(false)),
494    (SubtreeSize, bool, Default(false)),
495    (Timing, bool, Default(false)),
496    (Types, bool, Default(false)),
497    (Equivalences, bool, Default(false)),
498    (ReoptimizeImportedViews, Option<bool>, Default(None)),
499    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
500    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
501    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
502    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
503    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
504    (
505        EnableProjectionPushdownAfterRelationCse,
506        Option<bool>,
507        Default(None)
508    )
509);
510
511impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
512    type Error = PlanError;
513
514    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
515        // If `WITH(raw)` is specified, ensure that the config will be as
516        // representative for the original plan as possible.
517        if v.raw {
518            v.raw_plans = true;
519            v.raw_syntax = true;
520        }
521
522        // Certain config should default to be enabled in release builds running on
523        // staging or prod (where SOFT_ASSERTIONS are turned off).
524        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
525
526        Ok(ExplainConfig {
527            arity: v.arity.unwrap_or(enable_on_prod),
528            cardinality: v.cardinality,
529            column_names: v.column_names,
530            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
531            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
532            join_impls: v.join_implementations,
533            keys: v.keys,
534            linear_chains: !v.raw_plans && v.linear_chains,
535            no_fast_path: v.no_fast_path,
536            no_notices: v.no_notices,
537            node_ids: v.node_identifiers,
538            non_negative: v.non_negative,
539            raw_plans: v.raw_plans,
540            raw_syntax: v.raw_syntax,
541            verbose_syntax: false,
542            redacted: v.redacted,
543            subtree_size: v.subtree_size,
544            equivalences: v.equivalences,
545            timing: v.timing,
546            types: v.types,
547            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
548            features: OptimizerFeatureOverrides {
549                enable_guard_subquery_tablefunc: Default::default(),
550                enable_eager_delta_joins: v.enable_eager_delta_joins,
551                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
552                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
553                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
554                enable_consolidate_after_union_negate: Default::default(),
555                enable_reduce_mfp_fusion: Default::default(),
556                enable_cardinality_estimates: Default::default(),
557                persist_fast_path_limit: Default::default(),
558                reoptimize_imported_views: v.reoptimize_imported_views,
559                enable_reduce_reduction: Default::default(),
560                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
561                enable_projection_pushdown_after_relation_cse: v
562                    .enable_projection_pushdown_after_relation_cse,
563                enable_less_reduce_in_eqprop: Default::default(),
564                enable_dequadratic_eqprop_map: Default::default(),
565                enable_eq_classes_withholding_errors: Default::default(),
566                enable_fast_path_plan_insights: Default::default(),
567            },
568        })
569    }
570}
571
572fn plan_explainee(
573    scx: &StatementContext,
574    explainee: Explainee<Aug>,
575    params: &Params,
576) -> Result<plan::Explainee, PlanError> {
577    use crate::plan::ExplaineeStatement;
578
579    let is_replan = matches!(
580        explainee,
581        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
582    );
583
584    let explainee = match explainee {
585        Explainee::View(name) | Explainee::ReplanView(name) => {
586            let item = scx.get_item_by_resolved_name(&name)?;
587            let item_type = item.item_type();
588            if item_type != CatalogItemType::View {
589                sql_bail!("Expected {name} to be a view, not a {item_type}");
590            }
591            match is_replan {
592                true => crate::plan::Explainee::ReplanView(item.id()),
593                false => crate::plan::Explainee::View(item.id()),
594            }
595        }
596        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
597            let item = scx.get_item_by_resolved_name(&name)?;
598            let item_type = item.item_type();
599            if item_type != CatalogItemType::MaterializedView {
600                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
601            }
602            match is_replan {
603                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
604                false => crate::plan::Explainee::MaterializedView(item.id()),
605            }
606        }
607        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
608            let item = scx.get_item_by_resolved_name(&name)?;
609            let item_type = item.item_type();
610            if item_type != CatalogItemType::Index {
611                sql_bail!("Expected {name} to be an index, not a {item_type}");
612            }
613            match is_replan {
614                true => crate::plan::Explainee::ReplanIndex(item.id()),
615                false => crate::plan::Explainee::Index(item.id()),
616            }
617        }
618        Explainee::Select(select, broken) => {
619            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
620            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
621        }
622        Explainee::CreateView(mut stmt, broken) => {
623            if stmt.if_exists != IfExistsBehavior::Skip {
624                // If we don't force this parameter to Skip planning will
625                // fail for names that already exist in the catalog. This
626                // can happen even in `Replace` mode if the existing item
627                // has dependencies.
628                stmt.if_exists = IfExistsBehavior::Skip;
629            } else {
630                sql_bail!(
631                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
632                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
633                );
634            }
635
636            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
637                sql_bail!("expected CreateViewPlan plan");
638            };
639
640            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
641        }
642        Explainee::CreateMaterializedView(mut stmt, broken) => {
643            if stmt.if_exists != IfExistsBehavior::Skip {
644                // If we don't force this parameter to Skip planning will
645                // fail for names that already exist in the catalog. This
646                // can happen even in `Replace` mode if the existing item
647                // has dependencies.
648                stmt.if_exists = IfExistsBehavior::Skip;
649            } else {
650                sql_bail!(
651                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
652                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
653                );
654            }
655
656            let Plan::CreateMaterializedView(plan) =
657                ddl::plan_create_materialized_view(scx, *stmt)?
658            else {
659                sql_bail!("expected CreateMaterializedViewPlan plan");
660            };
661
662            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
663                broken,
664                plan,
665            })
666        }
667        Explainee::CreateIndex(mut stmt, broken) => {
668            if !stmt.if_not_exists {
669                // If we don't force this parameter to true planning will
670                // fail for index items that already exist in the catalog.
671                stmt.if_not_exists = true;
672            } else {
673                sql_bail!(
674                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
675                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
676                );
677            }
678
679            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
680                sql_bail!("expected CreateIndexPlan plan");
681            };
682
683            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
684        }
685    };
686
687    Ok(explainee)
688}
689
690pub fn plan_explain_plan(
691    scx: &StatementContext,
692    explain: ExplainPlanStatement<Aug>,
693    params: &Params,
694) -> Result<Plan, PlanError> {
695    let (format, verbose_syntax) = match explain.format() {
696        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
697        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
698        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
699        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
700    };
701    let stage = explain.stage();
702
703    // Plan ExplainConfig.
704    let mut config = {
705        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
706
707        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
708            // If filtering is disabled, explain plans should not include pushdown info.
709            with_options.filter_pushdown = Some(false);
710        }
711
712        ExplainConfig::try_from(with_options)?
713    };
714    config.verbose_syntax = verbose_syntax;
715
716    let explainee = plan_explainee(scx, explain.explainee, params)?;
717
718    Ok(Plan::ExplainPlan(ExplainPlanPlan {
719        stage,
720        format,
721        config,
722        explainee,
723    }))
724}
725
726pub fn plan_explain_schema(
727    scx: &StatementContext,
728    explain_schema: ExplainSinkSchemaStatement<Aug>,
729) -> Result<Plan, PlanError> {
730    let ExplainSinkSchemaStatement {
731        schema_for,
732        // Parser limits to JSON.
733        format: _,
734        mut statement,
735    } = explain_schema;
736
737    // Force the sink's name to one that's guaranteed not to exist, by virtue of
738    // being a non-existent item in a schema under the system's control, so that
739    // `plan_create_sink` doesn't complain about the name already existing.
740    statement.name = Some(UnresolvedItemName::qualified(&[
741        ident!("mz_catalog"),
742        ident!("mz_explain_schema"),
743    ]));
744
745    crate::pure::purify_create_sink_avro_doc_on_options(
746        scx.catalog,
747        *statement.from.item_id(),
748        &mut statement.format,
749    )?;
750
751    match ddl::plan_create_sink(scx, statement)? {
752        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
753            StorageSinkConnection::Kafka(KafkaSinkConnection {
754                format:
755                    KafkaSinkFormat {
756                        key_format,
757                        value_format:
758                            KafkaSinkFormatType::Avro {
759                                schema: value_schema,
760                                ..
761                            },
762                        ..
763                    },
764                ..
765            }) => {
766                let schema = match schema_for {
767                    ExplainSinkSchemaFor::Key => key_format
768                        .and_then(|f| match f {
769                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
770                            _ => None,
771                        })
772                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
773                    ExplainSinkSchemaFor::Value => value_schema,
774                };
775
776                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
777                    sink_from: sink.from,
778                    json_schema: schema,
779                }))
780            }
781            _ => bail_unsupported!(
782                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
783            ),
784        },
785        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
786    }
787}
788
789pub fn plan_explain_pushdown(
790    scx: &StatementContext,
791    statement: ExplainPushdownStatement<Aug>,
792    params: &Params,
793) -> Result<Plan, PlanError> {
794    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
795    let explainee = plan_explainee(scx, statement.explainee, params)?;
796    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
797}
798
799pub fn plan_explain_analyze(
800    scx: &StatementContext,
801    statement: ExplainAnalyzeStatement<Aug>,
802    params: &Params,
803) -> Result<Plan, PlanError> {
804    let explainee_name = statement
805        .explainee
806        .name()
807        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
808        .full_name_str();
809    let explainee = plan_explainee(scx, statement.explainee, params)?;
810
811    match explainee {
812        plan::Explainee::Index(_index_id) => (),
813        plan::Explainee::MaterializedView(_item_id) => (),
814        _ => {
815            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
816        }
817    };
818
819    // generate SQL query
820
821    /* WITH {CTEs}
822       SELECT REPEAT(' ', nesting * 2) || operator AS operator
823             {columns}
824        FROM      mz_introspection.mz_lir_mapping mlm
825             JOIN {from} USING (lir_id)
826             JOIN mz_introspection.mz_mappable_objects mo
827               ON (mlm.global_id = mo.global_id)
828       WHERE     mo.name = '{plan.explainee_name}'
829             AND {predicates}
830       ORDER BY lir_id DESC
831    */
832    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
833    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
834    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
835    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
836    let mut order_by = vec!["mlm.lir_id DESC"];
837
838    match statement.properties {
839        ExplainAnalyzeProperty::Computation { properties, skew } => {
840            let mut worker_id = None;
841            let mut seen_properties = BTreeSet::new();
842            for property in properties {
843                // handle each property only once (belt and suspenders)
844                if !seen_properties.insert(property) {
845                    continue;
846                }
847
848                match property {
849                    ExplainAnalyzeComputationProperty::Memory => {
850                        ctes.push((
851                            "summary_memory",
852                            r#"
853  SELECT mlm.global_id AS global_id,
854         mlm.lir_id AS lir_id,
855         SUM(mas.size) AS total_memory,
856         SUM(mas.records) AS total_records,
857         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
858         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
859    FROM            mz_introspection.mz_lir_mapping mlm
860         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
861               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
862                 ON (mas.operator_id = valid_id)
863GROUP BY mlm.global_id, mlm.lir_id"#,
864                        ));
865                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
866
867                        if skew {
868                            ctes.push((
869                                "per_worker_memory",
870                                r#"
871  SELECT mlm.global_id AS global_id,
872         mlm.lir_id AS lir_id,
873         mas.worker_id AS worker_id,
874         SUM(mas.size) AS worker_memory,
875         SUM(mas.records) AS worker_records
876    FROM            mz_introspection.mz_lir_mapping mlm
877         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
878               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
879                 ON (mas.operator_id = valid_id)
880GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
881                            ));
882                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
883
884                            if let Some(worker_id) = worker_id {
885                                predicates.push(format!("pwm.worker_id = {worker_id}"));
886                            } else {
887                                worker_id = Some("pwm.worker_id");
888                                columns.push("pwm.worker_id AS worker_id");
889                                order_by.push("worker_id");
890                            }
891
892                            columns.extend([
893                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
894                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
895                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
896                                "pg_size_pretty(sm.total_memory) AS total_memory",
897                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
898                                "pwm.worker_records AS worker_records",
899                                "sm.avg_records AS avg_records",
900                                "sm.total_records AS total_records",
901                            ]);
902                        } else {
903                            columns.extend([
904                                "pg_size_pretty(sm.total_memory) AS total_memory",
905                                "sm.total_records AS total_records",
906                            ]);
907                        }
908                    }
909                    ExplainAnalyzeComputationProperty::Cpu => {
910                        ctes.push((
911                            "summary_cpu",
912                            r#"
913  SELECT mlm.global_id AS global_id,
914         mlm.lir_id AS lir_id,
915         SUM(mse.elapsed_ns) AS total_ns,
916         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
917    FROM            mz_introspection.mz_lir_mapping mlm
918         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
919               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
920                 ON (mse.id = valid_id)
921GROUP BY mlm.global_id, mlm.lir_id"#,
922                        ));
923                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
924
925                        if skew {
926                            ctes.push((
927                                "per_worker_cpu",
928                                r#"
929  SELECT mlm.global_id AS global_id,
930         mlm.lir_id AS lir_id,
931         mse.worker_id AS worker_id,
932         SUM(mse.elapsed_ns) AS worker_ns
933    FROM            mz_introspection.mz_lir_mapping mlm
934         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
935               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
936                 ON (mse.id = valid_id)
937GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
938                            ));
939                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
940
941                            if let Some(worker_id) = worker_id {
942                                predicates.push(format!("pwc.worker_id = {worker_id}"));
943                            } else {
944                                worker_id = Some("pwc.worker_id");
945                                columns.push("pwc.worker_id AS worker_id");
946                                order_by.push("worker_id");
947                            }
948
949                            columns.extend([
950                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
951                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
952                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
953                            ]);
954                        }
955                        columns.push(
956                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
957                        );
958                    }
959                }
960            }
961        }
962        ExplainAnalyzeProperty::Hints => {
963            columns.extend([
964                "megsa.levels AS levels",
965                "megsa.to_cut AS to_cut",
966                "megsa.hint AS hint",
967                "pg_size_pretty(megsa.savings) AS savings",
968            ]);
969            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
970            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
971             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
972        }
973    }
974
975    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
976
977    let ctes = if !ctes.is_empty() {
978        format!(
979            "WITH {}",
980            separated(
981                ",\n",
982                ctes.iter()
983                    .map(|(name, defn)| format!("{name} AS ({defn})"))
984            )
985        )
986    } else {
987        String::new()
988    };
989    let columns = separated(", ", columns);
990    let from = separated(" ", from);
991    let predicates = separated(" AND ", predicates);
992    let order_by = separated(", ", order_by);
993    let query = format!(
994        r#"{ctes}
995SELECT {columns}
996FROM {from}
997WHERE {predicates}
998ORDER BY {order_by}"#
999    );
1000
1001    if statement.as_sql {
1002        let rows = vec![Row::pack_slice(&[Datum::String(
1003            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1004                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1005            })?,
1006        )])];
1007        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1008
1009        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1010    } else {
1011        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1012        show_select.plan()
1013    }
1014}
1015
1016pub fn plan_explain_timestamp(
1017    scx: &StatementContext,
1018    explain: ExplainTimestampStatement<Aug>,
1019) -> Result<Plan, PlanError> {
1020    let (format, _verbose_syntax) = match explain.format() {
1021        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1022        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1023        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1024        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1025    };
1026
1027    let raw_plan = {
1028        let query::PlannedRootQuery {
1029            expr: raw_plan,
1030            desc: _,
1031            finishing: _,
1032            scope: _,
1033        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1034        if raw_plan.contains_parameters()? {
1035            return Err(PlanError::ParameterNotAllowed(
1036                "EXPLAIN TIMESTAMP".to_string(),
1037            ));
1038        }
1039
1040        raw_plan
1041    };
1042    let when = query::plan_as_of(scx, explain.select.as_of)?;
1043
1044    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1045        format,
1046        raw_plan,
1047        when,
1048    }))
1049}
1050
1051/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1052/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1053#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1054pub fn plan_query(
1055    scx: &StatementContext,
1056    query: Query<Aug>,
1057    params: &Params,
1058    lifetime: QueryLifetime,
1059) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1060    let query::PlannedRootQuery {
1061        mut expr,
1062        desc,
1063        finishing,
1064        scope,
1065    } = query::plan_root_query(scx, query, lifetime)?;
1066    expr.bind_parameters(scx, lifetime, params)?;
1067
1068    Ok(query::PlannedRootQuery {
1069        // No metrics passed! One more reason not to use this deprecated function.
1070        expr: expr.lower(scx.catalog.system_vars(), None)?,
1071        desc,
1072        finishing,
1073        scope,
1074    })
1075}
1076
1077generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1078
1079pub fn describe_subscribe(
1080    scx: &StatementContext,
1081    stmt: SubscribeStatement<Aug>,
1082) -> Result<StatementDesc, PlanError> {
1083    let relation_desc = match stmt.relation {
1084        SubscribeRelation::Name(name) => {
1085            let item = scx.get_item_by_resolved_name(&name)?;
1086            item.desc(&scx.catalog.resolve_full_name(item.name()))?
1087                .into_owned()
1088        }
1089        SubscribeRelation::Query(query) => {
1090            let query::PlannedRootQuery { desc, .. } =
1091                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1092            desc
1093        }
1094    };
1095    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1096    let progress = progress.unwrap_or(false);
1097    let mut desc = RelationDesc::builder().with_column(
1098        "mz_timestamp",
1099        SqlScalarType::Numeric {
1100            max_scale: Some(NumericMaxScale::ZERO),
1101        }
1102        .nullable(false),
1103    );
1104    if progress {
1105        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1106    }
1107
1108    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1109    match stmt.output {
1110        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1111            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1112            for (name, mut ty) in relation_desc.into_iter() {
1113                if progress {
1114                    ty.nullable = true;
1115                }
1116                desc = desc.with_column(name, ty);
1117            }
1118        }
1119        SubscribeOutput::EnvelopeUpsert { key_columns }
1120        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1121            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1122            let key_columns = key_columns
1123                .into_iter()
1124                .map(normalize::column_name)
1125                .collect_vec();
1126            let mut before_values_desc = RelationDesc::builder();
1127            let mut after_values_desc = RelationDesc::builder();
1128
1129            // Add the key columns in the order that they're specified.
1130            for column_name in &key_columns {
1131                let mut column_ty = relation_desc
1132                    .get_by_name(column_name)
1133                    .map(|(_pos, ty)| ty.clone())
1134                    .ok_or_else(|| PlanError::UnknownColumn {
1135                        table: None,
1136                        column: column_name.clone(),
1137                        similar: Box::new([]),
1138                    })?;
1139                if progress {
1140                    column_ty.nullable = true;
1141                }
1142                desc = desc.with_column(column_name, column_ty);
1143            }
1144
1145            // Then add the remaining columns in the order from the original
1146            // table, filtering out the key columns since we added those above.
1147            for (mut name, mut ty) in relation_desc
1148                .into_iter()
1149                .filter(|(name, _ty)| !key_columns.contains(name))
1150            {
1151                ty.nullable = true;
1152                before_values_desc =
1153                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1154                if debezium {
1155                    name = format!("after_{}", name).into();
1156                }
1157                after_values_desc = after_values_desc.with_column(name, ty);
1158            }
1159
1160            if debezium {
1161                desc = desc.concat(before_values_desc);
1162            }
1163            desc = desc.concat(after_values_desc);
1164        }
1165    }
1166    Ok(StatementDesc::new(Some(desc.finish())))
1167}
1168
1169pub fn plan_subscribe(
1170    scx: &StatementContext,
1171    SubscribeStatement {
1172        relation,
1173        options,
1174        as_of,
1175        up_to,
1176        output,
1177    }: SubscribeStatement<Aug>,
1178    params: &Params,
1179    copy_to: Option<CopyFormat>,
1180) -> Result<Plan, PlanError> {
1181    let (from, desc, scope) = match relation {
1182        SubscribeRelation::Name(name) => {
1183            let entry = scx.get_item_by_resolved_name(&name)?;
1184            let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1185                Ok(desc) => desc,
1186                Err(..) => sql_bail!(
1187                    "'{}' cannot be subscribed to because it is a {}",
1188                    name.full_name_str(),
1189                    entry.item_type(),
1190                ),
1191            };
1192            let item_name = match name {
1193                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1194                _ => None,
1195            };
1196            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1197            (
1198                SubscribeFrom::Id(entry.global_id()),
1199                desc.into_owned(),
1200                scope,
1201            )
1202        }
1203        SubscribeRelation::Query(query) => {
1204            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1205            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1206            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1207            // finishing should have already been turned into a `TopK` by
1208            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1209            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1210                &query.finishing,
1211                query.desc.arity()
1212            ));
1213            let desc = query.desc.clone();
1214            (
1215                SubscribeFrom::Query {
1216                    expr: query.expr,
1217                    desc: query.desc,
1218                },
1219                desc,
1220                query.scope,
1221            )
1222        }
1223    };
1224
1225    let when = query::plan_as_of(scx, as_of)?;
1226    let up_to = up_to
1227        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1228        .transpose()?;
1229
1230    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1231    let ecx = ExprContext {
1232        qcx: &qcx,
1233        name: "",
1234        scope: &scope,
1235        relation_type: desc.typ(),
1236        allow_aggregates: false,
1237        allow_subqueries: true,
1238        allow_parameters: true,
1239        allow_windows: false,
1240    };
1241
1242    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1243    let output = match output {
1244        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1245        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1246            let order_by = key_columns
1247                .iter()
1248                .map(|ident| OrderByExpr {
1249                    expr: Expr::Identifier(vec![ident.clone()]),
1250                    asc: None,
1251                    nulls_last: None,
1252                })
1253                .collect_vec();
1254            let (order_by, map_exprs) = query::plan_order_by_exprs(
1255                &ExprContext {
1256                    name: "ENVELOPE UPSERT KEY clause",
1257                    ..ecx
1258                },
1259                &order_by[..],
1260                &output_columns[..],
1261            )?;
1262            if !map_exprs.is_empty() {
1263                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1264            }
1265            plan::SubscribeOutput::EnvelopeUpsert {
1266                order_by_keys: order_by,
1267            }
1268        }
1269        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1270            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1271            let order_by = key_columns
1272                .iter()
1273                .map(|ident| OrderByExpr {
1274                    expr: Expr::Identifier(vec![ident.clone()]),
1275                    asc: None,
1276                    nulls_last: None,
1277                })
1278                .collect_vec();
1279            let (order_by, map_exprs) = query::plan_order_by_exprs(
1280                &ExprContext {
1281                    name: "ENVELOPE DEBEZIUM KEY clause",
1282                    ..ecx
1283                },
1284                &order_by[..],
1285                &output_columns[..],
1286            )?;
1287            if !map_exprs.is_empty() {
1288                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1289            }
1290            plan::SubscribeOutput::EnvelopeDebezium {
1291                order_by_keys: order_by,
1292            }
1293        }
1294        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1295            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1296            let mz_diff = "mz_diff".into();
1297            let output_columns = std::iter::once((0, &mz_diff))
1298                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1299                .collect_vec();
1300            match query::plan_order_by_exprs(
1301                &ExprContext {
1302                    name: "WITHIN TIMESTAMP ORDER BY clause",
1303                    ..ecx
1304                },
1305                &order_by[..],
1306                &output_columns[..],
1307            ) {
1308                Err(PlanError::UnknownColumn {
1309                    table: None,
1310                    column,
1311                    similar: _,
1312                }) if &column == &mz_diff => {
1313                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1314                    // it looks like an unknown column. Instead, return a better error
1315                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1316                }
1317                Err(e) => return Err(e),
1318                Ok((order_by, map_exprs)) => {
1319                    if !map_exprs.is_empty() {
1320                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1321                    }
1322
1323                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1324                }
1325            }
1326        }
1327    };
1328
1329    let SubscribeOptionExtracted {
1330        progress, snapshot, ..
1331    } = options.try_into()?;
1332    Ok(Plan::Subscribe(SubscribePlan {
1333        from,
1334        when,
1335        up_to,
1336        with_snapshot: snapshot.unwrap_or(true),
1337        copy_to,
1338        emit_progress: progress.unwrap_or(false),
1339        output,
1340    }))
1341}
1342
1343pub fn describe_copy_from_table(
1344    scx: &StatementContext,
1345    table_name: <Aug as AstInfo>::ItemName,
1346    columns: Vec<Ident>,
1347) -> Result<StatementDesc, PlanError> {
1348    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1349    Ok(StatementDesc::new(Some(desc)))
1350}
1351
1352pub fn describe_copy_item(
1353    scx: &StatementContext,
1354    object_name: <Aug as AstInfo>::ItemName,
1355    columns: Vec<Ident>,
1356) -> Result<StatementDesc, PlanError> {
1357    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1358    Ok(StatementDesc::new(Some(desc)))
1359}
1360
1361pub fn describe_copy(
1362    scx: &StatementContext,
1363    CopyStatement {
1364        relation,
1365        direction,
1366        ..
1367    }: CopyStatement<Aug>,
1368) -> Result<StatementDesc, PlanError> {
1369    Ok(match (relation, direction) {
1370        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1371            describe_copy_item(scx, name, columns)?
1372        }
1373        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1374            describe_copy_from_table(scx, name, columns)?
1375        }
1376        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1377        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1378    }
1379    .with_is_copy())
1380}
1381
1382fn plan_copy_to_expr(
1383    scx: &StatementContext,
1384    select_plan: SelectPlan,
1385    desc: RelationDesc,
1386    to: &Expr<Aug>,
1387    format: CopyFormat,
1388    options: CopyOptionExtracted,
1389) -> Result<Plan, PlanError> {
1390    let conn_id = match options.aws_connection {
1391        Some(conn_id) => CatalogItemId::from(conn_id),
1392        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1393    };
1394    let connection = scx.get_item(&conn_id).connection()?;
1395
1396    match connection {
1397        mz_storage_types::connections::Connection::Aws(_) => {}
1398        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1399    }
1400
1401    let format = match format {
1402        CopyFormat::Csv => {
1403            let quote = extract_byte_param_value(options.quote, "quote")?;
1404            let escape = extract_byte_param_value(options.escape, "escape")?;
1405            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1406            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1407                CopyCsvFormatParams::try_new(
1408                    delimiter,
1409                    quote,
1410                    escape,
1411                    options.header,
1412                    options.null,
1413                )
1414                .map_err(|e| sql_err!("{}", e))?,
1415            ))
1416        }
1417        CopyFormat::Parquet => {
1418            // Validate that the output desc can be formatted as parquet
1419            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1420            S3SinkFormat::Parquet
1421        }
1422        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1423        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1424    };
1425
1426    // Converting the to expr to a HirScalarExpr
1427    let mut to_expr = to.clone();
1428    transform_ast::transform(scx, &mut to_expr)?;
1429    let relation_type = RelationDesc::empty();
1430    let ecx = &ExprContext {
1431        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1432        name: "COPY TO target",
1433        scope: &Scope::empty(),
1434        relation_type: relation_type.typ(),
1435        allow_aggregates: false,
1436        allow_subqueries: false,
1437        allow_parameters: false,
1438        allow_windows: false,
1439    };
1440
1441    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1442
1443    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1444        sql_bail!(
1445            "MAX FILE SIZE cannot be less than {}",
1446            MIN_S3_SINK_FILE_SIZE
1447        );
1448    }
1449    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1450        sql_bail!(
1451            "MAX FILE SIZE cannot be greater than {}",
1452            MAX_S3_SINK_FILE_SIZE
1453        );
1454    }
1455
1456    Ok(Plan::CopyTo(CopyToPlan {
1457        select_plan,
1458        desc,
1459        to,
1460        connection: connection.to_owned(),
1461        connection_id: conn_id,
1462        format,
1463        max_file_size: options.max_file_size.as_bytes(),
1464    }))
1465}
1466
1467fn plan_copy_from(
1468    scx: &StatementContext,
1469    target: &CopyTarget<Aug>,
1470    table_name: ResolvedItemName,
1471    columns: Vec<Ident>,
1472    format: CopyFormat,
1473    options: CopyOptionExtracted,
1474) -> Result<Plan, PlanError> {
1475    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1476        match option {
1477            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1478            None => Ok(()),
1479        }
1480    }
1481
1482    let source = match target {
1483        CopyTarget::Stdin => CopyFromSource::Stdin,
1484        CopyTarget::Expr(from) => {
1485            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1486
1487            // Converting the expr to an HirScalarExpr
1488            let mut from_expr = from.clone();
1489            transform_ast::transform(scx, &mut from_expr)?;
1490            let relation_type = RelationDesc::empty();
1491            let ecx = &ExprContext {
1492                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1493                name: "COPY FROM target",
1494                scope: &Scope::empty(),
1495                relation_type: relation_type.typ(),
1496                allow_aggregates: false,
1497                allow_subqueries: false,
1498                allow_parameters: false,
1499                allow_windows: false,
1500            };
1501            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1502
1503            match options.aws_connection {
1504                Some(conn_id) => {
1505                    let conn_id = CatalogItemId::from(conn_id);
1506
1507                    // Validate the connection type is one we expect.
1508                    let connection = match scx.get_item(&conn_id).connection()? {
1509                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1510                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1511                    };
1512
1513                    CopyFromSource::AwsS3 {
1514                        uri: from,
1515                        connection,
1516                        connection_id: conn_id,
1517                    }
1518                }
1519                None => CopyFromSource::Url(from),
1520            }
1521        }
1522        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1523    };
1524
1525    let params = match format {
1526        CopyFormat::Text => {
1527            only_available_with_csv(options.quote, "quote")?;
1528            only_available_with_csv(options.escape, "escape")?;
1529            only_available_with_csv(options.header, "HEADER")?;
1530            let delimiter =
1531                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1532            let null = match options.null {
1533                Some(null) => Cow::from(null),
1534                None => Cow::from("\\N"),
1535            };
1536            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1537        }
1538        CopyFormat::Csv => {
1539            let quote = extract_byte_param_value(options.quote, "quote")?;
1540            let escape = extract_byte_param_value(options.escape, "escape")?;
1541            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1542            CopyFormatParams::Csv(
1543                CopyCsvFormatParams::try_new(
1544                    delimiter,
1545                    quote,
1546                    escape,
1547                    options.header,
1548                    options.null,
1549                )
1550                .map_err(|e| sql_err!("{}", e))?,
1551            )
1552        }
1553        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1554        CopyFormat::Parquet => CopyFormatParams::Parquet,
1555    };
1556
1557    let filter = match (options.files, options.pattern) {
1558        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1559        (Some(files), None) => Some(CopyFromFilter::Files(files)),
1560        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1561        (None, None) => None,
1562    };
1563
1564    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1565        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1566    }
1567
1568    let table_name_string = table_name.full_name_str();
1569
1570    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1571
1572    let Some(mfp) = maybe_mfp else {
1573        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1574    };
1575
1576    Ok(Plan::CopyFrom(CopyFromPlan {
1577        target_id: id,
1578        target_name: table_name_string,
1579        source,
1580        columns,
1581        source_desc,
1582        mfp,
1583        params,
1584        filter,
1585    }))
1586}
1587
1588fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1589    match v {
1590        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1591        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1592        None => Ok(None),
1593    }
1594}
1595
1596generate_extracted_config!(
1597    CopyOption,
1598    (Format, String),
1599    (Delimiter, String),
1600    (Null, String),
1601    (Escape, String),
1602    (Quote, String),
1603    (Header, bool),
1604    (AwsConnection, with_options::Object),
1605    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1606    (Files, Vec<String>),
1607    (Pattern, String)
1608);
1609
1610pub fn plan_copy(
1611    scx: &StatementContext,
1612    CopyStatement {
1613        relation,
1614        direction,
1615        target,
1616        options,
1617    }: CopyStatement<Aug>,
1618) -> Result<Plan, PlanError> {
1619    let options = CopyOptionExtracted::try_from(options)?;
1620    // Parse any user-provided FORMAT option. If not provided, will default to
1621    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
1622    let format = options
1623        .format
1624        .as_ref()
1625        .map(|format| match format.to_lowercase().as_str() {
1626            "text" => Ok(CopyFormat::Text),
1627            "csv" => Ok(CopyFormat::Csv),
1628            "binary" => Ok(CopyFormat::Binary),
1629            "parquet" => Ok(CopyFormat::Parquet),
1630            _ => sql_bail!("unknown FORMAT: {}", format),
1631        })
1632        .transpose()?;
1633
1634    match (&direction, &target) {
1635        (CopyDirection::To, CopyTarget::Stdout) => {
1636            if options.delimiter.is_some() {
1637                sql_bail!("COPY TO does not support DELIMITER option yet");
1638            }
1639            if options.quote.is_some() {
1640                sql_bail!("COPY TO does not support QUOTE option yet");
1641            }
1642            if options.null.is_some() {
1643                sql_bail!("COPY TO does not support NULL option yet");
1644            }
1645            match relation {
1646                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1647                CopyRelation::Select(stmt) => Ok(plan_select(
1648                    scx,
1649                    stmt,
1650                    &Params::empty(),
1651                    Some(format.unwrap_or(CopyFormat::Text)),
1652                )?),
1653                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1654                    scx,
1655                    stmt,
1656                    &Params::empty(),
1657                    Some(format.unwrap_or(CopyFormat::Text)),
1658                )?),
1659            }
1660        }
1661        (CopyDirection::From, target) => match relation {
1662            CopyRelation::Named { name, columns } => plan_copy_from(
1663                scx,
1664                target,
1665                name,
1666                columns,
1667                format.unwrap_or(CopyFormat::Text),
1668                options,
1669            ),
1670            _ => sql_bail!("COPY FROM {} not supported", target),
1671        },
1672        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1673            // System users are always allowed to use this feature, even when
1674            // the flag is disabled, so that we can dogfood for analytics in
1675            // production environments. The feature is stable enough that we're
1676            // not worried about it crashing.
1677            if !scx.catalog.active_role_id().is_system() {
1678                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1679            }
1680
1681            let format = match format {
1682                Some(inner) => inner,
1683                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1684            };
1685
1686            let stmt = match relation {
1687                CopyRelation::Named { name, columns } => {
1688                    if !columns.is_empty() {
1689                        // TODO(mouli): Add support for this
1690                        sql_bail!(
1691                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1692                        );
1693                    }
1694                    // Generate a synthetic SELECT query that just gets the table
1695                    let query = Query {
1696                        ctes: CteBlock::empty(),
1697                        body: SetExpr::Table(name),
1698                        order_by: vec![],
1699                        limit: None,
1700                        offset: None,
1701                    };
1702                    SelectStatement { query, as_of: None }
1703                }
1704                CopyRelation::Select(stmt) => {
1705                    if !stmt.query.order_by.is_empty() {
1706                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1707                    }
1708                    stmt
1709                }
1710                _ => sql_bail!("COPY {} {} not supported", direction, target),
1711            };
1712
1713            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1714            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1715        }
1716        _ => sql_bail!("COPY {} {} not supported", direction, target),
1717    }
1718}