mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, RelationType, Row, ScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeComputationProperty, ExplainAnalyzeProperty, ExplainAnalyzeStatement,
34    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36    SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49    UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{ExprContext, QueryLifetime, offset_into_value, plan_expr, plan_up_to};
55use crate::plan::scope::Scope;
56use crate::plan::statement::show::ShowSelect;
57use crate::plan::statement::{StatementContext, StatementDesc, ddl};
58use crate::plan::{
59    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
60    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
61};
62use crate::plan::{
63    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
64    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
65};
66use crate::plan::{CopyFromSource, with_options};
67use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
68
69// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
70// plans the whole query to determine its shape and parameter types,
71// and then throws away that plan. If we were smarter, we'd stash that
72// plan somewhere so we don't have to recompute it when the query is
73// executed.
74
75pub fn describe_insert(
76    scx: &StatementContext,
77    InsertStatement {
78        table_name,
79        columns,
80        source,
81        returning,
82    }: InsertStatement<Aug>,
83) -> Result<StatementDesc, PlanError> {
84    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
85    let desc = if returning.expr.is_empty() {
86        None
87    } else {
88        Some(returning.desc)
89    };
90    Ok(StatementDesc::new(desc))
91}
92
93pub fn plan_insert(
94    scx: &StatementContext,
95    InsertStatement {
96        table_name,
97        columns,
98        source,
99        returning,
100    }: InsertStatement<Aug>,
101    params: &Params,
102) -> Result<Plan, PlanError> {
103    let (id, mut expr, returning) =
104        query::plan_insert_query(scx, table_name, columns, source, returning)?;
105    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
106    let returning = returning
107        .expr
108        .into_iter()
109        .map(|mut expr| {
110            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
111            expr.lower_uncorrelated()
112        })
113        .collect::<Result<Vec<_>, _>>()?;
114
115    Ok(Plan::Insert(InsertPlan {
116        id,
117        values: expr,
118        returning,
119    }))
120}
121
122pub fn describe_delete(
123    scx: &StatementContext,
124    stmt: DeleteStatement<Aug>,
125) -> Result<StatementDesc, PlanError> {
126    query::plan_delete_query(scx, stmt)?;
127    Ok(StatementDesc::new(None))
128}
129
130pub fn plan_delete(
131    scx: &StatementContext,
132    stmt: DeleteStatement<Aug>,
133    params: &Params,
134) -> Result<Plan, PlanError> {
135    let rtw_plan = query::plan_delete_query(scx, stmt)?;
136    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
137}
138
139pub fn describe_update(
140    scx: &StatementContext,
141    stmt: UpdateStatement<Aug>,
142) -> Result<StatementDesc, PlanError> {
143    query::plan_update_query(scx, stmt)?;
144    Ok(StatementDesc::new(None))
145}
146
147pub fn plan_update(
148    scx: &StatementContext,
149    stmt: UpdateStatement<Aug>,
150    params: &Params,
151) -> Result<Plan, PlanError> {
152    let rtw_plan = query::plan_update_query(scx, stmt)?;
153    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
154}
155
156pub fn plan_read_then_write(
157    scx: &StatementContext,
158    kind: MutationKind,
159    params: &Params,
160    query::ReadThenWritePlan {
161        id,
162        mut selection,
163        finishing,
164        assignments,
165    }: query::ReadThenWritePlan,
166) -> Result<Plan, PlanError> {
167    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
168    let mut assignments_outer = BTreeMap::new();
169    for (idx, mut set) in assignments {
170        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171        let set = set.lower_uncorrelated()?;
172        assignments_outer.insert(idx, set);
173    }
174
175    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
176        id,
177        selection,
178        finishing,
179        assignments: assignments_outer,
180        kind,
181        returning: Vec::new(),
182    }))
183}
184
185pub fn describe_select(
186    scx: &StatementContext,
187    stmt: SelectStatement<Aug>,
188) -> Result<StatementDesc, PlanError> {
189    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
190        return Ok(StatementDesc::new(Some(desc)));
191    }
192
193    let query::PlannedRootQuery { desc, .. } =
194        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
195    Ok(StatementDesc::new(Some(desc)))
196}
197
198pub fn plan_select(
199    scx: &StatementContext,
200    select: SelectStatement<Aug>,
201    params: &Params,
202    copy_to: Option<CopyFormat>,
203) -> Result<Plan, PlanError> {
204    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
205        return Ok(Plan::SideEffectingFunc(f));
206    }
207
208    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
209    Ok(Plan::Select(plan))
210}
211
212fn plan_select_inner(
213    scx: &StatementContext,
214    select: SelectStatement<Aug>,
215    params: &Params,
216    copy_to: Option<CopyFormat>,
217) -> Result<(SelectPlan, RelationDesc), PlanError> {
218    let when = query::plan_as_of(scx, select.as_of.clone())?;
219    let lifetime = QueryLifetime::OneShot;
220    let query::PlannedRootQuery {
221        mut expr,
222        desc,
223        finishing,
224        scope: _,
225    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
226    expr.bind_parameters(scx, lifetime, params)?;
227
228    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
229    // Let's check this and simplify them to literals.
230    expr.try_visit_mut_pre(&mut |expr| {
231        if let HirRelationExpr::TopK { offset, .. } = expr {
232            let offset_value = offset_into_value(offset.take())?;
233            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), ScalarType::Int64);
234        }
235        Ok::<(), PlanError>(())
236    })?;
237    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
238    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
239    // so later.)
240
241    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
242    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
243    // This involves binding parameters and evaluating each expression to a number.
244    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
245    // so this `limit` has to be a constant.)
246    let limit = match finishing.limit {
247        None => None,
248        Some(mut limit) => {
249            limit.bind_parameters(scx, lifetime, params)?;
250            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
251            let Some(limit) = limit.as_literal() else {
252                sql_bail!(
253                    "Top-level LIMIT must be a constant expression, got {}",
254                    limit
255                )
256            };
257            match limit {
258                Datum::Null => None,
259                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
260                _ => {
261                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
262                    sql_bail!("LIMIT must be a non-negative INT or NULL")
263                }
264            }
265        }
266    };
267    let offset = {
268        let mut offset = finishing.offset.clone();
269        offset.bind_parameters(scx, lifetime, params)?;
270        let offset = offset_into_value(offset.take())?;
271        offset
272            .try_into()
273            .expect("checked in offset_into_value that it is not negative")
274    };
275
276    let plan = SelectPlan {
277        source: expr,
278        when,
279        finishing: RowSetFinishing {
280            limit,
281            offset,
282            project: finishing.project,
283            order_by: finishing.order_by,
284        },
285        copy_to,
286        select: Some(Box::new(select)),
287    };
288
289    Ok((plan, desc))
290}
291
292pub fn describe_explain_plan(
293    scx: &StatementContext,
294    explain: ExplainPlanStatement<Aug>,
295) -> Result<StatementDesc, PlanError> {
296    let mut relation_desc = RelationDesc::builder();
297
298    match explain.stage() {
299        ExplainStage::RawPlan => {
300            let name = "Raw Plan";
301            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
302        }
303        ExplainStage::DecorrelatedPlan => {
304            let name = "Decorrelated Plan";
305            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
306        }
307        ExplainStage::LocalPlan => {
308            let name = "Locally Optimized Plan";
309            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
310        }
311        ExplainStage::GlobalPlan => {
312            let name = "Optimized Plan";
313            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
314        }
315        ExplainStage::PhysicalPlan => {
316            let name = "Physical Plan";
317            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
318        }
319        ExplainStage::Trace => {
320            relation_desc = relation_desc
321                .with_column("Time", ScalarType::UInt64.nullable(false))
322                .with_column("Path", ScalarType::String.nullable(false))
323                .with_column("Plan", ScalarType::String.nullable(false));
324        }
325        ExplainStage::PlanInsights => {
326            let name = "Plan Insights";
327            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
328        }
329    };
330    let relation_desc = relation_desc.finish();
331
332    Ok(
333        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
334            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
335            _ => vec![],
336        }),
337    )
338}
339
340pub fn describe_explain_pushdown(
341    scx: &StatementContext,
342    statement: ExplainPushdownStatement<Aug>,
343) -> Result<StatementDesc, PlanError> {
344    let relation_desc = RelationDesc::builder()
345        .with_column("Source", ScalarType::String.nullable(false))
346        .with_column("Total Bytes", ScalarType::UInt64.nullable(false))
347        .with_column("Selected Bytes", ScalarType::UInt64.nullable(false))
348        .with_column("Total Parts", ScalarType::UInt64.nullable(false))
349        .with_column("Selected Parts", ScalarType::UInt64.nullable(false))
350        .finish();
351
352    Ok(
353        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
354            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
355            _ => vec![],
356        }),
357    )
358}
359
360pub fn describe_explain_analyze(
361    _scx: &StatementContext,
362    statement: ExplainAnalyzeStatement<Aug>,
363) -> Result<StatementDesc, PlanError> {
364    if statement.as_sql {
365        let relation_desc = RelationDesc::builder()
366            .with_column("SQL", ScalarType::String.nullable(false))
367            .finish();
368        return Ok(StatementDesc::new(Some(relation_desc)));
369    }
370
371    match statement.properties {
372        ExplainAnalyzeProperty::Computation { properties, skew } => {
373            let mut relation_desc =
374                RelationDesc::builder().with_column("operator", ScalarType::String.nullable(false));
375
376            if skew {
377                relation_desc =
378                    relation_desc.with_column("worker_id", ScalarType::UInt64.nullable(true));
379            }
380
381            let mut seen_properties = BTreeSet::new();
382            for property in properties {
383                // handle each property only once (belt and suspenders)
384                if !seen_properties.insert(property) {
385                    continue;
386                }
387
388                match property {
389                    ExplainAnalyzeComputationProperty::Memory if skew => {
390                        let numeric = ScalarType::Numeric { max_scale: None }.nullable(true);
391                        relation_desc = relation_desc
392                            .with_column("memory_ratio", numeric.clone())
393                            .with_column("worker_memory", ScalarType::String.nullable(true))
394                            .with_column("avg_memory", ScalarType::String.nullable(true))
395                            .with_column("total_memory", ScalarType::String.nullable(true))
396                            .with_column("records_ratio", numeric.clone())
397                            .with_column("worker_records", numeric.clone())
398                            .with_column("avg_records", numeric.clone())
399                            .with_column("total_records", numeric);
400                    }
401                    ExplainAnalyzeComputationProperty::Memory => {
402                        relation_desc = relation_desc
403                            .with_column("total_memory", ScalarType::String.nullable(true))
404                            .with_column(
405                                "total_records",
406                                ScalarType::Numeric { max_scale: None }.nullable(true),
407                            );
408                    }
409                    ExplainAnalyzeComputationProperty::Cpu => {
410                        if skew {
411                            relation_desc = relation_desc
412                                .with_column(
413                                    "cpu_ratio",
414                                    ScalarType::Numeric { max_scale: None }.nullable(true),
415                                )
416                                .with_column("worker_elapsed", ScalarType::Interval.nullable(true))
417                                .with_column("avg_elapsed", ScalarType::Interval.nullable(true));
418                        }
419                        relation_desc = relation_desc
420                            .with_column("total_elapsed", ScalarType::Interval.nullable(true));
421                    }
422                }
423            }
424
425            let relation_desc = relation_desc.finish();
426            Ok(StatementDesc::new(Some(relation_desc)))
427        }
428        ExplainAnalyzeProperty::Hints => {
429            let relation_desc = RelationDesc::builder()
430                .with_column("operator", ScalarType::String.nullable(true))
431                .with_column("levels", ScalarType::Int64.nullable(true))
432                .with_column("to_cut", ScalarType::Int64.nullable(true))
433                .with_column("hint", ScalarType::Float64.nullable(true))
434                .with_column("savings", ScalarType::String.nullable(true))
435                .finish();
436            Ok(StatementDesc::new(Some(relation_desc)))
437        }
438    }
439}
440
441pub fn describe_explain_timestamp(
442    scx: &StatementContext,
443    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
444) -> Result<StatementDesc, PlanError> {
445    let relation_desc = RelationDesc::builder()
446        .with_column("Timestamp", ScalarType::String.nullable(false))
447        .finish();
448
449    Ok(StatementDesc::new(Some(relation_desc))
450        .with_params(describe_select(scx, select)?.param_types))
451}
452
453pub fn describe_explain_schema(
454    _: &StatementContext,
455    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
456) -> Result<StatementDesc, PlanError> {
457    let relation_desc = RelationDesc::builder()
458        .with_column("Schema", ScalarType::String.nullable(false))
459        .finish();
460    Ok(StatementDesc::new(Some(relation_desc)))
461}
462
463// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
464// `bool`:
465// - When it's an override of a global feature flag, for example optimizer feature flags. In this
466//   case, we need not just false and true, but also None to say "take the value of the global
467//   flag".
468// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
469//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
470//   but otherwise we'd like to default to false.
471generate_extracted_config!(
472    ExplainPlanOption,
473    (Arity, Option<bool>, Default(None)),
474    (Cardinality, bool, Default(false)),
475    (ColumnNames, bool, Default(false)),
476    (FilterPushdown, Option<bool>, Default(None)),
477    (HumanizedExpressions, Option<bool>, Default(None)),
478    (JoinImplementations, bool, Default(false)),
479    (Keys, bool, Default(false)),
480    (LinearChains, bool, Default(false)),
481    (NoFastPath, bool, Default(false)),
482    (NonNegative, bool, Default(false)),
483    (NoNotices, bool, Default(false)),
484    (NodeIdentifiers, bool, Default(false)),
485    (Raw, bool, Default(false)),
486    (RawPlans, bool, Default(false)),
487    (RawSyntax, bool, Default(false)),
488    (Redacted, bool, Default(false)),
489    (SubtreeSize, bool, Default(false)),
490    (Timing, bool, Default(false)),
491    (Types, bool, Default(false)),
492    (Equivalences, bool, Default(false)),
493    (ReoptimizeImportedViews, Option<bool>, Default(None)),
494    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
495    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
496    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
497    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
498    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
499    (
500        EnableProjectionPushdownAfterRelationCse,
501        Option<bool>,
502        Default(None)
503    )
504);
505
506impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
507    type Error = PlanError;
508
509    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
510        // If `WITH(raw)` is specified, ensure that the config will be as
511        // representative for the original plan as possible.
512        if v.raw {
513            v.raw_plans = true;
514            v.raw_syntax = true;
515        }
516
517        // Certain config should default to be enabled in release builds running on
518        // staging or prod (where SOFT_ASSERTIONS are turned off).
519        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
520
521        Ok(ExplainConfig {
522            arity: v.arity.unwrap_or(enable_on_prod),
523            cardinality: v.cardinality,
524            column_names: v.column_names,
525            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
526            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
527            join_impls: v.join_implementations,
528            keys: v.keys,
529            linear_chains: !v.raw_plans && v.linear_chains,
530            no_fast_path: v.no_fast_path,
531            no_notices: v.no_notices,
532            node_ids: v.node_identifiers,
533            non_negative: v.non_negative,
534            raw_plans: v.raw_plans,
535            raw_syntax: v.raw_syntax,
536            verbose_syntax: false,
537            redacted: v.redacted,
538            subtree_size: v.subtree_size,
539            equivalences: v.equivalences,
540            timing: v.timing,
541            types: v.types,
542            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
543            features: OptimizerFeatureOverrides {
544                enable_guard_subquery_tablefunc: Default::default(),
545                enable_eager_delta_joins: v.enable_eager_delta_joins,
546                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
547                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
548                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
549                enable_consolidate_after_union_negate: Default::default(),
550                enable_reduce_mfp_fusion: Default::default(),
551                enable_cardinality_estimates: Default::default(),
552                persist_fast_path_limit: Default::default(),
553                reoptimize_imported_views: v.reoptimize_imported_views,
554                enable_reduce_reduction: Default::default(),
555                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
556                enable_projection_pushdown_after_relation_cse: v
557                    .enable_projection_pushdown_after_relation_cse,
558                enable_less_reduce_in_eqprop: Default::default(),
559                enable_dequadratic_eqprop_map: Default::default(),
560                enable_eq_classes_withholding_errors: Default::default(),
561                enable_fast_path_plan_insights: Default::default(),
562            },
563        })
564    }
565}
566
567fn plan_explainee(
568    scx: &StatementContext,
569    explainee: Explainee<Aug>,
570    params: &Params,
571) -> Result<plan::Explainee, PlanError> {
572    use crate::plan::ExplaineeStatement;
573
574    let is_replan = matches!(
575        explainee,
576        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
577    );
578
579    let explainee = match explainee {
580        Explainee::View(name) | Explainee::ReplanView(name) => {
581            let item = scx.get_item_by_resolved_name(&name)?;
582            let item_type = item.item_type();
583            if item_type != CatalogItemType::View {
584                sql_bail!("Expected {name} to be a view, not a {item_type}");
585            }
586            match is_replan {
587                true => crate::plan::Explainee::ReplanView(item.id()),
588                false => crate::plan::Explainee::View(item.id()),
589            }
590        }
591        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
592            let item = scx.get_item_by_resolved_name(&name)?;
593            let item_type = item.item_type();
594            if item_type != CatalogItemType::MaterializedView {
595                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
596            }
597            match is_replan {
598                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
599                false => crate::plan::Explainee::MaterializedView(item.id()),
600            }
601        }
602        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
603            let item = scx.get_item_by_resolved_name(&name)?;
604            let item_type = item.item_type();
605            if item_type != CatalogItemType::Index {
606                sql_bail!("Expected {name} to be an index, not a {item_type}");
607            }
608            match is_replan {
609                true => crate::plan::Explainee::ReplanIndex(item.id()),
610                false => crate::plan::Explainee::Index(item.id()),
611            }
612        }
613        Explainee::Select(select, broken) => {
614            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
615            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
616        }
617        Explainee::CreateView(mut stmt, broken) => {
618            if stmt.if_exists != IfExistsBehavior::Skip {
619                // If we don't force this parameter to Skip planning will
620                // fail for names that already exist in the catalog. This
621                // can happen even in `Replace` mode if the existing item
622                // has dependencies.
623                stmt.if_exists = IfExistsBehavior::Skip;
624            } else {
625                sql_bail!(
626                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
627                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
628                );
629            }
630
631            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
632                sql_bail!("expected CreateViewPlan plan");
633            };
634
635            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
636        }
637        Explainee::CreateMaterializedView(mut stmt, broken) => {
638            if stmt.if_exists != IfExistsBehavior::Skip {
639                // If we don't force this parameter to Skip planning will
640                // fail for names that already exist in the catalog. This
641                // can happen even in `Replace` mode if the existing item
642                // has dependencies.
643                stmt.if_exists = IfExistsBehavior::Skip;
644            } else {
645                sql_bail!(
646                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
647                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
648                );
649            }
650
651            let Plan::CreateMaterializedView(plan) =
652                ddl::plan_create_materialized_view(scx, *stmt)?
653            else {
654                sql_bail!("expected CreateMaterializedViewPlan plan");
655            };
656
657            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
658                broken,
659                plan,
660            })
661        }
662        Explainee::CreateIndex(mut stmt, broken) => {
663            if !stmt.if_not_exists {
664                // If we don't force this parameter to true planning will
665                // fail for index items that already exist in the catalog.
666                stmt.if_not_exists = true;
667            } else {
668                sql_bail!(
669                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
670                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
671                );
672            }
673
674            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
675                sql_bail!("expected CreateIndexPlan plan");
676            };
677
678            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
679        }
680    };
681
682    Ok(explainee)
683}
684
685pub fn plan_explain_plan(
686    scx: &StatementContext,
687    explain: ExplainPlanStatement<Aug>,
688    params: &Params,
689) -> Result<Plan, PlanError> {
690    let (format, verbose_syntax) = match explain.format() {
691        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
692        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
693        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
694        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
695    };
696    let stage = explain.stage();
697
698    // Plan ExplainConfig.
699    let mut config = {
700        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
701
702        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
703            // If filtering is disabled, explain plans should not include pushdown info.
704            with_options.filter_pushdown = Some(false);
705        }
706
707        ExplainConfig::try_from(with_options)?
708    };
709    config.verbose_syntax = verbose_syntax;
710
711    let explainee = plan_explainee(scx, explain.explainee, params)?;
712
713    Ok(Plan::ExplainPlan(ExplainPlanPlan {
714        stage,
715        format,
716        config,
717        explainee,
718    }))
719}
720
721pub fn plan_explain_schema(
722    scx: &StatementContext,
723    explain_schema: ExplainSinkSchemaStatement<Aug>,
724) -> Result<Plan, PlanError> {
725    let ExplainSinkSchemaStatement {
726        schema_for,
727        // Parser limits to JSON.
728        format: _,
729        mut statement,
730    } = explain_schema;
731
732    // Force the sink's name to one that's guaranteed not to exist, by virtue of
733    // being a non-existent item in a schema under the system's control, so that
734    // `plan_create_sink` doesn't complain about the name already existing.
735    statement.name = Some(UnresolvedItemName::qualified(&[
736        ident!("mz_catalog"),
737        ident!("mz_explain_schema"),
738    ]));
739
740    crate::pure::purify_create_sink_avro_doc_on_options(
741        scx.catalog,
742        *statement.from.item_id(),
743        &mut statement.format,
744    )?;
745
746    match ddl::plan_create_sink(scx, statement)? {
747        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
748            StorageSinkConnection::Kafka(KafkaSinkConnection {
749                format:
750                    KafkaSinkFormat {
751                        key_format,
752                        value_format:
753                            KafkaSinkFormatType::Avro {
754                                schema: value_schema,
755                                ..
756                            },
757                        ..
758                    },
759                ..
760            }) => {
761                let schema = match schema_for {
762                    ExplainSinkSchemaFor::Key => key_format
763                        .and_then(|f| match f {
764                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
765                            _ => None,
766                        })
767                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
768                    ExplainSinkSchemaFor::Value => value_schema,
769                };
770
771                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
772                    sink_from: sink.from,
773                    json_schema: schema,
774                }))
775            }
776            _ => bail_unsupported!(
777                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
778            ),
779        },
780        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
781    }
782}
783
784pub fn plan_explain_pushdown(
785    scx: &StatementContext,
786    statement: ExplainPushdownStatement<Aug>,
787    params: &Params,
788) -> Result<Plan, PlanError> {
789    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
790    let explainee = plan_explainee(scx, statement.explainee, params)?;
791    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
792}
793
794pub fn plan_explain_analyze(
795    scx: &StatementContext,
796    statement: ExplainAnalyzeStatement<Aug>,
797    params: &Params,
798) -> Result<Plan, PlanError> {
799    let explainee_name = statement
800        .explainee
801        .name()
802        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
803        .full_name_str();
804    let explainee = plan_explainee(scx, statement.explainee, params)?;
805
806    match explainee {
807        plan::Explainee::Index(_index_id) => (),
808        plan::Explainee::MaterializedView(_item_id) => (),
809        _ => {
810            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
811        }
812    };
813
814    // generate SQL query
815
816    /* WITH {CTEs}
817       SELECT REPEAT(' ', nesting * 2) || operator AS operator
818             {columns}
819        FROM      mz_introspection.mz_lir_mapping mlm
820             JOIN {from} USING (lir_id)
821             JOIN mz_introspection.mz_mappable_objects mo
822               ON (mlm.global_id = mo.global_id)
823       WHERE     mo.name = '{plan.explainee_name}'
824             AND {predicates}
825       ORDER BY lir_id DESC
826    */
827    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
828    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
829    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
830    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
831    let mut order_by = vec!["mlm.lir_id DESC"];
832
833    match statement.properties {
834        ExplainAnalyzeProperty::Computation { properties, skew } => {
835            let mut worker_id = None;
836            let mut seen_properties = BTreeSet::new();
837            for property in properties {
838                // handle each property only once (belt and suspenders)
839                if !seen_properties.insert(property) {
840                    continue;
841                }
842
843                match property {
844                    ExplainAnalyzeComputationProperty::Memory => {
845                        ctes.push((
846                            "summary_memory",
847                            r#"
848  SELECT mlm.global_id AS global_id,
849         mlm.lir_id AS lir_id,
850         SUM(mas.size) AS total_memory,
851         SUM(mas.records) AS total_records,
852         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
853         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
854    FROM      mz_introspection.mz_lir_mapping mlm
855         JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
856           ON (mlm.operator_id_start <= mas.operator_id AND mas.operator_id < mlm.operator_id_end)
857GROUP BY mlm.global_id, mlm.lir_id"#,
858                        ));
859                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
860
861                        if skew {
862                            ctes.push((
863                                "per_worker_memory",
864                                r#"
865  SELECT mlm.global_id AS global_id,
866         mlm.lir_id AS lir_id,
867         mas.worker_id AS worker_id,
868         SUM(mas.size) AS worker_memory,
869         SUM(mas.records) AS worker_records
870    FROM      mz_introspection.mz_lir_mapping mlm
871         JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
872           ON (mlm.operator_id_start <= mas.operator_id AND mas.operator_id < mlm.operator_id_end)
873GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
874                            ));
875                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
876
877                            if let Some(worker_id) = worker_id {
878                                predicates.push(format!("pwm.worker_id = {worker_id}"));
879                            } else {
880                                worker_id = Some("pwm.worker_id");
881                                columns.push("pwm.worker_id AS worker_id");
882                                order_by.push("worker_id");
883                            }
884
885                            columns.extend([
886                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
887                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
888                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
889                                "pg_size_pretty(sm.total_memory) AS total_memory",
890                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
891                                "pwm.worker_records AS worker_records",
892                                "sm.avg_records AS avg_records",
893                                "sm.total_records AS total_records",
894                            ]);
895                        } else {
896                            columns.extend([
897                                "pg_size_pretty(sm.total_memory) AS total_memory",
898                                "sm.total_records AS total_records",
899                            ]);
900                        }
901                    }
902                    ExplainAnalyzeComputationProperty::Cpu => {
903                        ctes.push((
904                            "summary_cpu",
905                            r#"
906  SELECT mlm.global_id AS global_id,
907         mlm.lir_id AS lir_id,
908         SUM(mse.elapsed_ns) AS total_ns,
909         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
910    FROM      mz_introspection.mz_lir_mapping mlm
911         JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
912           ON (mlm.operator_id_start <= mse.id AND mse.id < mlm.operator_id_end)
913GROUP BY mlm.global_id, mlm.lir_id"#,
914                        ));
915                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
916
917                        if skew {
918                            ctes.push((
919                                "per_worker_cpu",
920                                r#"
921  SELECT mlm.global_id AS global_id,
922         mlm.lir_id AS lir_id,
923         mse.worker_id AS worker_id,
924         SUM(mse.elapsed_ns) AS worker_ns
925    FROM      mz_introspection.mz_lir_mapping mlm
926         JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
927           ON (mlm.operator_id_start <= mse.id AND mse.id < mlm.operator_id_end)
928GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
929                            ));
930                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
931
932                            if let Some(worker_id) = worker_id {
933                                predicates.push(format!("pwc.worker_id = {worker_id}"));
934                            } else {
935                                worker_id = Some("pwc.worker_id");
936                                columns.push("pwc.worker_id AS worker_id");
937                                order_by.push("worker_id");
938                            }
939
940                            columns.extend([
941                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
942                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
943                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
944                            ]);
945                        }
946                        columns.push(
947                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
948                        );
949                    }
950                }
951            }
952        }
953        ExplainAnalyzeProperty::Hints => {
954            columns.extend([
955                "megsa.levels AS levels",
956                "megsa.to_cut AS to_cut",
957                "megsa.hint AS hint",
958                "pg_size_pretty(savings) AS savings",
959            ]);
960            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
961            "LEFT JOIN mz_introspection.mz_expected_group_size_advice megsa ON (megsa.dataflow_id = mdgi.id AND mlm.operator_id_start <= megsa.region_id AND megsa.region_id < mlm.operator_id_end)"]);
962        }
963    }
964
965    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
966
967    let ctes = if !ctes.is_empty() {
968        format!(
969            "WITH {}",
970            separated(
971                ",\n",
972                ctes.iter()
973                    .map(|(name, defn)| format!("{name} AS ({defn})"))
974            )
975        )
976    } else {
977        String::new()
978    };
979    let columns = separated(", ", columns);
980    let from = separated(" ", from);
981    let predicates = separated(" AND ", predicates);
982    let order_by = separated(", ", order_by);
983    let query = format!(
984        r#"{ctes}
985SELECT {columns}
986FROM {from}
987WHERE {predicates}
988ORDER BY {order_by}"#
989    );
990
991    if statement.as_sql {
992        let rows = vec![Row::pack_slice(&[Datum::String(
993            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
994                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
995            })?,
996        )])];
997        let typ = RelationType::new(vec![ScalarType::String.nullable(false)]);
998
999        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1000    } else {
1001        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1002        show_select.plan()
1003    }
1004}
1005
1006pub fn plan_explain_timestamp(
1007    scx: &StatementContext,
1008    explain: ExplainTimestampStatement<Aug>,
1009) -> Result<Plan, PlanError> {
1010    let (format, _verbose_syntax) = match explain.format() {
1011        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1012        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1013        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1014        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1015    };
1016
1017    let raw_plan = {
1018        let query::PlannedRootQuery {
1019            expr: raw_plan,
1020            desc: _,
1021            finishing: _,
1022            scope: _,
1023        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1024        if raw_plan.contains_parameters()? {
1025            return Err(PlanError::ParameterNotAllowed(
1026                "EXPLAIN TIMESTAMP".to_string(),
1027            ));
1028        }
1029
1030        raw_plan
1031    };
1032    let when = query::plan_as_of(scx, explain.select.as_of)?;
1033
1034    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1035        format,
1036        raw_plan,
1037        when,
1038    }))
1039}
1040
1041/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1042/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1043#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1044pub fn plan_query(
1045    scx: &StatementContext,
1046    query: Query<Aug>,
1047    params: &Params,
1048    lifetime: QueryLifetime,
1049) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1050    let query::PlannedRootQuery {
1051        mut expr,
1052        desc,
1053        finishing,
1054        scope,
1055    } = query::plan_root_query(scx, query, lifetime)?;
1056    expr.bind_parameters(scx, lifetime, params)?;
1057
1058    Ok(query::PlannedRootQuery {
1059        // No metrics passed! One more reason not to use this deprecated function.
1060        expr: expr.lower(scx.catalog.system_vars(), None)?,
1061        desc,
1062        finishing,
1063        scope,
1064    })
1065}
1066
1067generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1068
1069pub fn describe_subscribe(
1070    scx: &StatementContext,
1071    stmt: SubscribeStatement<Aug>,
1072) -> Result<StatementDesc, PlanError> {
1073    let relation_desc = match stmt.relation {
1074        SubscribeRelation::Name(name) => {
1075            let item = scx.get_item_by_resolved_name(&name)?;
1076            item.desc(&scx.catalog.resolve_full_name(item.name()))?
1077                .into_owned()
1078        }
1079        SubscribeRelation::Query(query) => {
1080            let query::PlannedRootQuery { desc, .. } =
1081                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1082            desc
1083        }
1084    };
1085    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1086    let progress = progress.unwrap_or(false);
1087    let mut desc = RelationDesc::builder().with_column(
1088        "mz_timestamp",
1089        ScalarType::Numeric {
1090            max_scale: Some(NumericMaxScale::ZERO),
1091        }
1092        .nullable(false),
1093    );
1094    if progress {
1095        desc = desc.with_column("mz_progressed", ScalarType::Bool.nullable(false));
1096    }
1097
1098    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1099    match stmt.output {
1100        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1101            desc = desc.with_column("mz_diff", ScalarType::Int64.nullable(true));
1102            for (name, mut ty) in relation_desc.into_iter() {
1103                if progress {
1104                    ty.nullable = true;
1105                }
1106                desc = desc.with_column(name, ty);
1107            }
1108        }
1109        SubscribeOutput::EnvelopeUpsert { key_columns }
1110        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1111            desc = desc.with_column("mz_state", ScalarType::String.nullable(true));
1112            let key_columns = key_columns
1113                .into_iter()
1114                .map(normalize::column_name)
1115                .collect_vec();
1116            let mut before_values_desc = RelationDesc::builder();
1117            let mut after_values_desc = RelationDesc::builder();
1118
1119            // Add the key columns in the order that they're specified.
1120            for column_name in &key_columns {
1121                let mut column_ty = relation_desc
1122                    .get_by_name(column_name)
1123                    .map(|(_pos, ty)| ty.clone())
1124                    .ok_or_else(|| PlanError::UnknownColumn {
1125                        table: None,
1126                        column: column_name.clone(),
1127                        similar: Box::new([]),
1128                    })?;
1129                if progress {
1130                    column_ty.nullable = true;
1131                }
1132                desc = desc.with_column(column_name, column_ty);
1133            }
1134
1135            // Then add the remaining columns in the order from the original
1136            // table, filtering out the key columns since we added those above.
1137            for (mut name, mut ty) in relation_desc
1138                .into_iter()
1139                .filter(|(name, _ty)| !key_columns.contains(name))
1140            {
1141                ty.nullable = true;
1142                before_values_desc =
1143                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1144                if debezium {
1145                    name = format!("after_{}", name).into();
1146                }
1147                after_values_desc = after_values_desc.with_column(name, ty);
1148            }
1149
1150            if debezium {
1151                desc = desc.concat(before_values_desc);
1152            }
1153            desc = desc.concat(after_values_desc);
1154        }
1155    }
1156    Ok(StatementDesc::new(Some(desc.finish())))
1157}
1158
1159pub fn plan_subscribe(
1160    scx: &StatementContext,
1161    SubscribeStatement {
1162        relation,
1163        options,
1164        as_of,
1165        up_to,
1166        output,
1167    }: SubscribeStatement<Aug>,
1168    params: &Params,
1169    copy_to: Option<CopyFormat>,
1170) -> Result<Plan, PlanError> {
1171    let (from, desc, scope) = match relation {
1172        SubscribeRelation::Name(name) => {
1173            let entry = scx.get_item_by_resolved_name(&name)?;
1174            let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1175                Ok(desc) => desc,
1176                Err(..) => sql_bail!(
1177                    "'{}' cannot be subscribed to because it is a {}",
1178                    name.full_name_str(),
1179                    entry.item_type(),
1180                ),
1181            };
1182            let item_name = match name {
1183                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1184                _ => None,
1185            };
1186            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1187            (
1188                SubscribeFrom::Id(entry.global_id()),
1189                desc.into_owned(),
1190                scope,
1191            )
1192        }
1193        SubscribeRelation::Query(query) => {
1194            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1195            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1196            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1197            // finishing should have already been turned into a `TopK` by
1198            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1199            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1200                &query.finishing,
1201                query.desc.arity()
1202            ));
1203            let desc = query.desc.clone();
1204            (
1205                SubscribeFrom::Query {
1206                    expr: query.expr,
1207                    desc: query.desc,
1208                },
1209                desc,
1210                query.scope,
1211            )
1212        }
1213    };
1214
1215    let when = query::plan_as_of(scx, as_of)?;
1216    let up_to = up_to.map(|up_to| plan_up_to(scx, up_to)).transpose()?;
1217
1218    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1219    let ecx = ExprContext {
1220        qcx: &qcx,
1221        name: "",
1222        scope: &scope,
1223        relation_type: desc.typ(),
1224        allow_aggregates: false,
1225        allow_subqueries: true,
1226        allow_parameters: true,
1227        allow_windows: false,
1228    };
1229
1230    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1231    let output = match output {
1232        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1233        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1234            let order_by = key_columns
1235                .iter()
1236                .map(|ident| OrderByExpr {
1237                    expr: Expr::Identifier(vec![ident.clone()]),
1238                    asc: None,
1239                    nulls_last: None,
1240                })
1241                .collect_vec();
1242            let (order_by, map_exprs) = query::plan_order_by_exprs(
1243                &ExprContext {
1244                    name: "ENVELOPE UPSERT KEY clause",
1245                    ..ecx
1246                },
1247                &order_by[..],
1248                &output_columns[..],
1249            )?;
1250            if !map_exprs.is_empty() {
1251                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1252            }
1253            plan::SubscribeOutput::EnvelopeUpsert {
1254                order_by_keys: order_by,
1255            }
1256        }
1257        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1258            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1259            let order_by = key_columns
1260                .iter()
1261                .map(|ident| OrderByExpr {
1262                    expr: Expr::Identifier(vec![ident.clone()]),
1263                    asc: None,
1264                    nulls_last: None,
1265                })
1266                .collect_vec();
1267            let (order_by, map_exprs) = query::plan_order_by_exprs(
1268                &ExprContext {
1269                    name: "ENVELOPE DEBEZIUM KEY clause",
1270                    ..ecx
1271                },
1272                &order_by[..],
1273                &output_columns[..],
1274            )?;
1275            if !map_exprs.is_empty() {
1276                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1277            }
1278            plan::SubscribeOutput::EnvelopeDebezium {
1279                order_by_keys: order_by,
1280            }
1281        }
1282        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1283            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1284            let mz_diff = "mz_diff".into();
1285            let output_columns = std::iter::once((0, &mz_diff))
1286                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1287                .collect_vec();
1288            match query::plan_order_by_exprs(
1289                &ExprContext {
1290                    name: "WITHIN TIMESTAMP ORDER BY clause",
1291                    ..ecx
1292                },
1293                &order_by[..],
1294                &output_columns[..],
1295            ) {
1296                Err(PlanError::UnknownColumn {
1297                    table: None,
1298                    column,
1299                    similar: _,
1300                }) if &column == &mz_diff => {
1301                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1302                    // it looks like an unknown column. Instead, return a better error
1303                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1304                }
1305                Err(e) => return Err(e),
1306                Ok((order_by, map_exprs)) => {
1307                    if !map_exprs.is_empty() {
1308                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1309                    }
1310
1311                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1312                }
1313            }
1314        }
1315    };
1316
1317    let SubscribeOptionExtracted {
1318        progress, snapshot, ..
1319    } = options.try_into()?;
1320    Ok(Plan::Subscribe(SubscribePlan {
1321        from,
1322        when,
1323        up_to,
1324        with_snapshot: snapshot.unwrap_or(true),
1325        copy_to,
1326        emit_progress: progress.unwrap_or(false),
1327        output,
1328    }))
1329}
1330
1331pub fn describe_copy_from_table(
1332    scx: &StatementContext,
1333    table_name: <Aug as AstInfo>::ItemName,
1334    columns: Vec<Ident>,
1335) -> Result<StatementDesc, PlanError> {
1336    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1337    Ok(StatementDesc::new(Some(desc)))
1338}
1339
1340pub fn describe_copy_item(
1341    scx: &StatementContext,
1342    object_name: <Aug as AstInfo>::ItemName,
1343    columns: Vec<Ident>,
1344) -> Result<StatementDesc, PlanError> {
1345    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1346    Ok(StatementDesc::new(Some(desc)))
1347}
1348
1349pub fn describe_copy(
1350    scx: &StatementContext,
1351    CopyStatement {
1352        relation,
1353        direction,
1354        ..
1355    }: CopyStatement<Aug>,
1356) -> Result<StatementDesc, PlanError> {
1357    Ok(match (relation, direction) {
1358        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1359            describe_copy_item(scx, name, columns)?
1360        }
1361        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1362            describe_copy_from_table(scx, name, columns)?
1363        }
1364        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1365        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1366    }
1367    .with_is_copy())
1368}
1369
1370fn plan_copy_to_expr(
1371    scx: &StatementContext,
1372    select_plan: SelectPlan,
1373    desc: RelationDesc,
1374    to: &Expr<Aug>,
1375    format: CopyFormat,
1376    options: CopyOptionExtracted,
1377) -> Result<Plan, PlanError> {
1378    let conn_id = match options.aws_connection {
1379        Some(conn_id) => CatalogItemId::from(conn_id),
1380        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1381    };
1382    let connection = scx.get_item(&conn_id).connection()?;
1383
1384    match connection {
1385        mz_storage_types::connections::Connection::Aws(_) => {}
1386        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1387    }
1388
1389    let format = match format {
1390        CopyFormat::Csv => {
1391            let quote = extract_byte_param_value(options.quote, "quote")?;
1392            let escape = extract_byte_param_value(options.escape, "escape")?;
1393            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1394            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1395                CopyCsvFormatParams::try_new(
1396                    delimiter,
1397                    quote,
1398                    escape,
1399                    options.header,
1400                    options.null,
1401                )
1402                .map_err(|e| sql_err!("{}", e))?,
1403            ))
1404        }
1405        CopyFormat::Parquet => {
1406            // Validate that the output desc can be formatted as parquet
1407            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1408            S3SinkFormat::Parquet
1409        }
1410        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1411        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1412    };
1413
1414    // Converting the to expr to a HirScalarExpr
1415    let mut to_expr = to.clone();
1416    transform_ast::transform(scx, &mut to_expr)?;
1417    let relation_type = RelationDesc::empty();
1418    let ecx = &ExprContext {
1419        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1420        name: "COPY TO target",
1421        scope: &Scope::empty(),
1422        relation_type: relation_type.typ(),
1423        allow_aggregates: false,
1424        allow_subqueries: false,
1425        allow_parameters: false,
1426        allow_windows: false,
1427    };
1428
1429    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &ScalarType::String)?;
1430
1431    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1432        sql_bail!(
1433            "MAX FILE SIZE cannot be less than {}",
1434            MIN_S3_SINK_FILE_SIZE
1435        );
1436    }
1437    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1438        sql_bail!(
1439            "MAX FILE SIZE cannot be greater than {}",
1440            MAX_S3_SINK_FILE_SIZE
1441        );
1442    }
1443
1444    Ok(Plan::CopyTo(CopyToPlan {
1445        select_plan,
1446        desc,
1447        to,
1448        connection: connection.to_owned(),
1449        connection_id: conn_id,
1450        format,
1451        max_file_size: options.max_file_size.as_bytes(),
1452    }))
1453}
1454
1455fn plan_copy_from(
1456    scx: &StatementContext,
1457    target: &CopyTarget<Aug>,
1458    table_name: ResolvedItemName,
1459    columns: Vec<Ident>,
1460    format: CopyFormat,
1461    options: CopyOptionExtracted,
1462) -> Result<Plan, PlanError> {
1463    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1464        match option {
1465            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1466            None => Ok(()),
1467        }
1468    }
1469
1470    let source = match target {
1471        CopyTarget::Stdin => CopyFromSource::Stdin,
1472        CopyTarget::Expr(from) => {
1473            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1474
1475            // Converting the expr to an HirScalarExpr
1476            let mut from_expr = from.clone();
1477            transform_ast::transform(scx, &mut from_expr)?;
1478            let relation_type = RelationDesc::empty();
1479            let ecx = &ExprContext {
1480                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1481                name: "COPY FROM target",
1482                scope: &Scope::empty(),
1483                relation_type: relation_type.typ(),
1484                allow_aggregates: false,
1485                allow_subqueries: false,
1486                allow_parameters: false,
1487                allow_windows: false,
1488            };
1489            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?;
1490
1491            match options.aws_connection {
1492                Some(conn_id) => {
1493                    let conn_id = CatalogItemId::from(conn_id);
1494
1495                    // Validate the connection type is one we expect.
1496                    let connection = match scx.get_item(&conn_id).connection()? {
1497                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1498                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1499                    };
1500
1501                    CopyFromSource::AwsS3 {
1502                        uri: from,
1503                        connection,
1504                        connection_id: conn_id,
1505                    }
1506                }
1507                None => CopyFromSource::Url(from),
1508            }
1509        }
1510        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1511    };
1512
1513    let params = match format {
1514        CopyFormat::Text => {
1515            only_available_with_csv(options.quote, "quote")?;
1516            only_available_with_csv(options.escape, "escape")?;
1517            only_available_with_csv(options.header, "HEADER")?;
1518            let delimiter =
1519                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1520            let null = match options.null {
1521                Some(null) => Cow::from(null),
1522                None => Cow::from("\\N"),
1523            };
1524            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1525        }
1526        CopyFormat::Csv => {
1527            let quote = extract_byte_param_value(options.quote, "quote")?;
1528            let escape = extract_byte_param_value(options.escape, "escape")?;
1529            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1530            CopyFormatParams::Csv(
1531                CopyCsvFormatParams::try_new(
1532                    delimiter,
1533                    quote,
1534                    escape,
1535                    options.header,
1536                    options.null,
1537                )
1538                .map_err(|e| sql_err!("{}", e))?,
1539            )
1540        }
1541        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1542        CopyFormat::Parquet => CopyFormatParams::Parquet,
1543    };
1544
1545    let filter = match (options.files, options.pattern) {
1546        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1547        (Some(files), None) => Some(CopyFromFilter::Files(files)),
1548        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1549        (None, None) => None,
1550    };
1551
1552    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1553        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1554    }
1555
1556    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1557
1558    let Some(mfp) = maybe_mfp else {
1559        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1560    };
1561
1562    Ok(Plan::CopyFrom(CopyFromPlan {
1563        id,
1564        source,
1565        columns,
1566        source_desc,
1567        mfp,
1568        params,
1569        filter,
1570    }))
1571}
1572
1573fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1574    match v {
1575        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1576        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1577        None => Ok(None),
1578    }
1579}
1580
1581generate_extracted_config!(
1582    CopyOption,
1583    (Format, String),
1584    (Delimiter, String),
1585    (Null, String),
1586    (Escape, String),
1587    (Quote, String),
1588    (Header, bool),
1589    (AwsConnection, with_options::Object),
1590    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1591    (Files, Vec<String>),
1592    (Pattern, String)
1593);
1594
1595pub fn plan_copy(
1596    scx: &StatementContext,
1597    CopyStatement {
1598        relation,
1599        direction,
1600        target,
1601        options,
1602    }: CopyStatement<Aug>,
1603) -> Result<Plan, PlanError> {
1604    let options = CopyOptionExtracted::try_from(options)?;
1605    // Parse any user-provided FORMAT option. If not provided, will default to
1606    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
1607    let format = options
1608        .format
1609        .as_ref()
1610        .map(|format| match format.to_lowercase().as_str() {
1611            "text" => Ok(CopyFormat::Text),
1612            "csv" => Ok(CopyFormat::Csv),
1613            "binary" => Ok(CopyFormat::Binary),
1614            "parquet" => Ok(CopyFormat::Parquet),
1615            _ => sql_bail!("unknown FORMAT: {}", format),
1616        })
1617        .transpose()?;
1618
1619    match (&direction, &target) {
1620        (CopyDirection::To, CopyTarget::Stdout) => {
1621            if options.delimiter.is_some() {
1622                sql_bail!("COPY TO does not support DELIMITER option yet");
1623            }
1624            if options.quote.is_some() {
1625                sql_bail!("COPY TO does not support QUOTE option yet");
1626            }
1627            if options.null.is_some() {
1628                sql_bail!("COPY TO does not support NULL option yet");
1629            }
1630            match relation {
1631                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1632                CopyRelation::Select(stmt) => Ok(plan_select(
1633                    scx,
1634                    stmt,
1635                    &Params::empty(),
1636                    Some(format.unwrap_or(CopyFormat::Text)),
1637                )?),
1638                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1639                    scx,
1640                    stmt,
1641                    &Params::empty(),
1642                    Some(format.unwrap_or(CopyFormat::Text)),
1643                )?),
1644            }
1645        }
1646        (CopyDirection::From, target) => match relation {
1647            CopyRelation::Named { name, columns } => plan_copy_from(
1648                scx,
1649                target,
1650                name,
1651                columns,
1652                format.unwrap_or(CopyFormat::Text),
1653                options,
1654            ),
1655            _ => sql_bail!("COPY FROM {} not supported", target),
1656        },
1657        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1658            // System users are always allowed to use this feature, even when
1659            // the flag is disabled, so that we can dogfood for analytics in
1660            // production environments. The feature is stable enough that we're
1661            // not worried about it crashing.
1662            if !scx.catalog.active_role_id().is_system() {
1663                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1664            }
1665
1666            let format = match format {
1667                Some(inner) => inner,
1668                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1669            };
1670
1671            let stmt = match relation {
1672                CopyRelation::Named { name, columns } => {
1673                    if !columns.is_empty() {
1674                        // TODO(mouli): Add support for this
1675                        sql_bail!(
1676                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1677                        );
1678                    }
1679                    // Generate a synthetic SELECT query that just gets the table
1680                    let query = Query {
1681                        ctes: CteBlock::empty(),
1682                        body: SetExpr::Table(name),
1683                        order_by: vec![],
1684                        limit: None,
1685                        offset: None,
1686                    };
1687                    SelectStatement { query, as_of: None }
1688                }
1689                CopyRelation::Select(stmt) => {
1690                    if !stmt.query.order_by.is_empty() {
1691                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1692                    }
1693                    stmt
1694                }
1695                _ => sql_bail!("COPY {} {} not supported", direction, target),
1696            };
1697
1698            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1699            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1700        }
1701        _ => sql_bail!("COPY {} {} not supported", direction, target),
1702    }
1703}