mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::BTreeMap;
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::{MirRelationExpr, RowSetFinishing};
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, ScalarType};
30use mz_sql_parser::ast::{
31    CteBlock, ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement,
32    ExplainSinkSchemaFor, ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr,
33    IfExistsBehavior, OrderByExpr, SetExpr, SubscribeOutput, UnresolvedItemName,
34};
35use mz_sql_parser::ident;
36use mz_storage_types::sinks::{
37    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
38    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
39};
40
41use crate::ast::display::AstDisplay;
42use crate::ast::{
43    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
44    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
45    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
46    UpdateStatement,
47};
48use crate::catalog::CatalogItemType;
49use crate::names::{Aug, ResolvedItemName};
50use crate::normalize;
51use crate::plan::query::{ExprContext, QueryLifetime, plan_expr, plan_up_to};
52use crate::plan::scope::Scope;
53use crate::plan::statement::{StatementContext, StatementDesc, ddl};
54use crate::plan::{
55    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
56    ExplainTimestampPlan, side_effecting_func, transform_ast,
57};
58use crate::plan::{
59    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
60    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
61};
62use crate::plan::{CopyFromSource, with_options};
63use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
64
65// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
66// plans the whole query to determine its shape and parameter types,
67// and then throws away that plan. If we were smarter, we'd stash that
68// plan somewhere so we don't have to recompute it when the query is
69// executed.
70
71pub fn describe_insert(
72    scx: &StatementContext,
73    InsertStatement {
74        table_name,
75        columns,
76        source,
77        returning,
78    }: InsertStatement<Aug>,
79) -> Result<StatementDesc, PlanError> {
80    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
81    let desc = if returning.expr.is_empty() {
82        None
83    } else {
84        Some(returning.desc)
85    };
86    Ok(StatementDesc::new(desc))
87}
88
89pub fn plan_insert(
90    scx: &StatementContext,
91    InsertStatement {
92        table_name,
93        columns,
94        source,
95        returning,
96    }: InsertStatement<Aug>,
97    params: &Params,
98) -> Result<Plan, PlanError> {
99    let (id, mut expr, returning) =
100        query::plan_insert_query(scx, table_name, columns, source, returning)?;
101    expr.bind_parameters(params)?;
102    let returning = returning
103        .expr
104        .into_iter()
105        .map(|expr| expr.lower_uncorrelated())
106        .collect::<Result<Vec<_>, _>>()?;
107
108    Ok(Plan::Insert(InsertPlan {
109        id,
110        values: expr,
111        returning,
112    }))
113}
114
115pub fn describe_delete(
116    scx: &StatementContext,
117    stmt: DeleteStatement<Aug>,
118) -> Result<StatementDesc, PlanError> {
119    query::plan_delete_query(scx, stmt)?;
120    Ok(StatementDesc::new(None))
121}
122
123pub fn plan_delete(
124    scx: &StatementContext,
125    stmt: DeleteStatement<Aug>,
126    params: &Params,
127) -> Result<Plan, PlanError> {
128    let rtw_plan = query::plan_delete_query(scx, stmt)?;
129    plan_read_then_write(MutationKind::Delete, params, rtw_plan)
130}
131
132pub fn describe_update(
133    scx: &StatementContext,
134    stmt: UpdateStatement<Aug>,
135) -> Result<StatementDesc, PlanError> {
136    query::plan_update_query(scx, stmt)?;
137    Ok(StatementDesc::new(None))
138}
139
140pub fn plan_update(
141    scx: &StatementContext,
142    stmt: UpdateStatement<Aug>,
143    params: &Params,
144) -> Result<Plan, PlanError> {
145    let rtw_plan = query::plan_update_query(scx, stmt)?;
146    plan_read_then_write(MutationKind::Update, params, rtw_plan)
147}
148
149pub fn plan_read_then_write(
150    kind: MutationKind,
151    params: &Params,
152    query::ReadThenWritePlan {
153        id,
154        mut selection,
155        finishing,
156        assignments,
157    }: query::ReadThenWritePlan,
158) -> Result<Plan, PlanError> {
159    selection.bind_parameters(params)?;
160    let mut assignments_outer = BTreeMap::new();
161    for (idx, mut set) in assignments {
162        set.bind_parameters(params)?;
163        let set = set.lower_uncorrelated()?;
164        assignments_outer.insert(idx, set);
165    }
166
167    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
168        id,
169        selection,
170        finishing,
171        assignments: assignments_outer,
172        kind,
173        returning: Vec::new(),
174    }))
175}
176
177pub fn describe_select(
178    scx: &StatementContext,
179    stmt: SelectStatement<Aug>,
180) -> Result<StatementDesc, PlanError> {
181    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
182        return Ok(StatementDesc::new(Some(desc)));
183    }
184
185    let query::PlannedRootQuery { desc, .. } =
186        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
187    Ok(StatementDesc::new(Some(desc)))
188}
189
190pub fn plan_select(
191    scx: &StatementContext,
192    select: SelectStatement<Aug>,
193    params: &Params,
194    copy_to: Option<CopyFormat>,
195) -> Result<Plan, PlanError> {
196    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
197        return Ok(Plan::SideEffectingFunc(f));
198    }
199
200    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
201    Ok(Plan::Select(plan))
202}
203
204fn plan_select_inner(
205    scx: &StatementContext,
206    select: SelectStatement<Aug>,
207    params: &Params,
208    copy_to: Option<CopyFormat>,
209) -> Result<(SelectPlan, RelationDesc), PlanError> {
210    let when = query::plan_as_of(scx, select.as_of.clone())?;
211    let query::PlannedRootQuery {
212        mut expr,
213        desc,
214        finishing,
215        scope: _,
216    } = query::plan_root_query(scx, select.query.clone(), QueryLifetime::OneShot)?;
217    expr.bind_parameters(params)?;
218
219    // A top-level limit cannot be data dependent so eagerly evaluate it.
220    let limit = match finishing.limit {
221        None => None,
222        Some(mut limit) => {
223            limit.bind_parameters(params)?;
224            let Some(limit) = limit.as_literal() else {
225                sql_bail!("Top-level LIMIT must be a constant expression")
226            };
227            match limit {
228                Datum::Null => None,
229                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
230                _ => {
231                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
232                    sql_bail!("LIMIT must be a non negative INT or NULL")
233                }
234            }
235        }
236    };
237
238    let plan = SelectPlan {
239        source: expr,
240        when,
241        finishing: RowSetFinishing {
242            limit,
243            offset: finishing.offset,
244            project: finishing.project,
245            order_by: finishing.order_by,
246        },
247        copy_to,
248        select: Some(Box::new(select)),
249    };
250
251    Ok((plan, desc))
252}
253
254pub fn describe_explain_plan(
255    scx: &StatementContext,
256    explain: ExplainPlanStatement<Aug>,
257) -> Result<StatementDesc, PlanError> {
258    let mut relation_desc = RelationDesc::builder();
259
260    match explain.stage() {
261        ExplainStage::RawPlan => {
262            let name = "Raw Plan";
263            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
264        }
265        ExplainStage::DecorrelatedPlan => {
266            let name = "Decorrelated Plan";
267            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
268        }
269        ExplainStage::LocalPlan => {
270            let name = "Locally Optimized Plan";
271            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
272        }
273        ExplainStage::GlobalPlan => {
274            let name = "Optimized Plan";
275            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
276        }
277        ExplainStage::PhysicalPlan => {
278            let name = "Physical Plan";
279            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
280        }
281        ExplainStage::Trace => {
282            relation_desc = relation_desc
283                .with_column("Time", ScalarType::UInt64.nullable(false))
284                .with_column("Path", ScalarType::String.nullable(false))
285                .with_column("Plan", ScalarType::String.nullable(false));
286        }
287        ExplainStage::PlanInsights => {
288            let name = "Plan Insights";
289            relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
290        }
291    };
292    let relation_desc = relation_desc.finish();
293
294    Ok(
295        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
296            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
297            _ => vec![],
298        }),
299    )
300}
301
302pub fn describe_explain_pushdown(
303    scx: &StatementContext,
304    statement: ExplainPushdownStatement<Aug>,
305) -> Result<StatementDesc, PlanError> {
306    let relation_desc = RelationDesc::builder()
307        .with_column("Source", ScalarType::String.nullable(false))
308        .with_column("Total Bytes", ScalarType::UInt64.nullable(false))
309        .with_column("Selected Bytes", ScalarType::UInt64.nullable(false))
310        .with_column("Total Parts", ScalarType::UInt64.nullable(false))
311        .with_column("Selected Parts", ScalarType::UInt64.nullable(false))
312        .finish();
313
314    Ok(
315        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
316            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
317            _ => vec![],
318        }),
319    )
320}
321
322pub fn describe_explain_timestamp(
323    scx: &StatementContext,
324    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
325) -> Result<StatementDesc, PlanError> {
326    let relation_desc = RelationDesc::builder()
327        .with_column("Timestamp", ScalarType::String.nullable(false))
328        .finish();
329
330    Ok(StatementDesc::new(Some(relation_desc))
331        .with_params(describe_select(scx, select)?.param_types))
332}
333
334pub fn describe_explain_schema(
335    _: &StatementContext,
336    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
337) -> Result<StatementDesc, PlanError> {
338    let relation_desc = RelationDesc::builder()
339        .with_column("Schema", ScalarType::String.nullable(false))
340        .finish();
341    Ok(StatementDesc::new(Some(relation_desc)))
342}
343
344// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
345// `bool`:
346// - When it's an override of a global feature flag, for example optimizer feature flags. In this
347//   case, we need not just false and true, but also None to say "take the value of the global
348//   flag".
349// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
350//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
351//   but otherwise we'd like to default to false.
352generate_extracted_config!(
353    ExplainPlanOption,
354    (Arity, Option<bool>, Default(None)),
355    (Cardinality, bool, Default(false)),
356    (ColumnNames, bool, Default(false)),
357    (FilterPushdown, Option<bool>, Default(None)),
358    (HumanizedExpressions, Option<bool>, Default(None)),
359    (JoinImplementations, bool, Default(false)),
360    (Keys, bool, Default(false)),
361    (LinearChains, bool, Default(false)),
362    (NoFastPath, bool, Default(false)),
363    (NonNegative, bool, Default(false)),
364    (NoNotices, bool, Default(false)),
365    (NodeIdentifiers, bool, Default(false)),
366    (Raw, bool, Default(false)),
367    (RawPlans, bool, Default(false)),
368    (RawSyntax, bool, Default(false)),
369    (Redacted, bool, Default(false)),
370    (SubtreeSize, bool, Default(false)),
371    (Timing, bool, Default(false)),
372    (Types, bool, Default(false)),
373    (Equivalences, bool, Default(false)),
374    (ReoptimizeImportedViews, Option<bool>, Default(None)),
375    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
376    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
377    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
378    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
379    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
380    (
381        EnableProjectionPushdownAfterRelationCse,
382        Option<bool>,
383        Default(None)
384    )
385);
386
387impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
388    type Error = PlanError;
389
390    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
391        // If `WITH(raw)` is specified, ensure that the config will be as
392        // representative for the original plan as possible.
393        if v.raw {
394            v.raw_plans = true;
395            v.raw_syntax = true;
396        }
397
398        // Certain config should default to be enabled in release builds running on
399        // staging or prod (where SOFT_ASSERTIONS are turned off).
400        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
401
402        Ok(ExplainConfig {
403            arity: v.arity.unwrap_or(enable_on_prod),
404            cardinality: v.cardinality,
405            column_names: v.column_names,
406            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
407            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
408            join_impls: v.join_implementations,
409            keys: v.keys,
410            linear_chains: !v.raw_plans && v.linear_chains,
411            no_fast_path: v.no_fast_path,
412            no_notices: v.no_notices,
413            node_ids: v.node_identifiers,
414            non_negative: v.non_negative,
415            raw_plans: v.raw_plans,
416            raw_syntax: v.raw_syntax,
417            redacted: v.redacted,
418            subtree_size: v.subtree_size,
419            equivalences: v.equivalences,
420            timing: v.timing,
421            types: v.types,
422            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
423            features: OptimizerFeatureOverrides {
424                enable_eager_delta_joins: v.enable_eager_delta_joins,
425                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
426                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
427                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
428                enable_consolidate_after_union_negate: Default::default(),
429                enable_reduce_mfp_fusion: Default::default(),
430                enable_cardinality_estimates: Default::default(),
431                persist_fast_path_limit: Default::default(),
432                reoptimize_imported_views: v.reoptimize_imported_views,
433                enable_reduce_reduction: Default::default(),
434                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
435                enable_projection_pushdown_after_relation_cse: v
436                    .enable_projection_pushdown_after_relation_cse,
437                enable_less_reduce_in_eqprop: Default::default(),
438                enable_dequadratic_eqprop_map: Default::default(),
439            },
440        })
441    }
442}
443
444fn plan_explainee(
445    scx: &StatementContext,
446    explainee: Explainee<Aug>,
447    params: &Params,
448) -> Result<plan::Explainee, PlanError> {
449    use crate::plan::ExplaineeStatement;
450
451    let is_replan = matches!(
452        explainee,
453        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
454    );
455
456    let explainee = match explainee {
457        Explainee::View(name) | Explainee::ReplanView(name) => {
458            let item = scx.get_item_by_resolved_name(&name)?;
459            let item_type = item.item_type();
460            if item_type != CatalogItemType::View {
461                sql_bail!("Expected {name} to be a view, not a {item_type}");
462            }
463            match is_replan {
464                true => crate::plan::Explainee::ReplanView(item.id()),
465                false => crate::plan::Explainee::View(item.id()),
466            }
467        }
468        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
469            let item = scx.get_item_by_resolved_name(&name)?;
470            let item_type = item.item_type();
471            if item_type != CatalogItemType::MaterializedView {
472                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
473            }
474            match is_replan {
475                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
476                false => crate::plan::Explainee::MaterializedView(item.id()),
477            }
478        }
479        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
480            let item = scx.get_item_by_resolved_name(&name)?;
481            let item_type = item.item_type();
482            if item_type != CatalogItemType::Index {
483                sql_bail!("Expected {name} to be an index, not a {item_type}");
484            }
485            match is_replan {
486                true => crate::plan::Explainee::ReplanIndex(item.id()),
487                false => crate::plan::Explainee::Index(item.id()),
488            }
489        }
490        Explainee::Select(select, broken) => {
491            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
492            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
493        }
494        Explainee::CreateView(mut stmt, broken) => {
495            if stmt.if_exists != IfExistsBehavior::Skip {
496                // If we don't force this parameter to Skip planning will
497                // fail for names that already exist in the catalog. This
498                // can happen even in `Replace` mode if the existing item
499                // has dependencies.
500                stmt.if_exists = IfExistsBehavior::Skip;
501            } else {
502                sql_bail!(
503                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
504                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
505                );
506            }
507
508            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt, params)? else {
509                sql_bail!("expected CreateViewPlan plan");
510            };
511
512            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
513        }
514        Explainee::CreateMaterializedView(mut stmt, broken) => {
515            if stmt.if_exists != IfExistsBehavior::Skip {
516                // If we don't force this parameter to Skip planning will
517                // fail for names that already exist in the catalog. This
518                // can happen even in `Replace` mode if the existing item
519                // has dependencies.
520                stmt.if_exists = IfExistsBehavior::Skip;
521            } else {
522                sql_bail!(
523                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
524                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
525                );
526            }
527
528            let Plan::CreateMaterializedView(plan) =
529                ddl::plan_create_materialized_view(scx, *stmt, params)?
530            else {
531                sql_bail!("expected CreateMaterializedViewPlan plan");
532            };
533
534            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
535                broken,
536                plan,
537            })
538        }
539        Explainee::CreateIndex(mut stmt, broken) => {
540            if !stmt.if_not_exists {
541                // If we don't force this parameter to true planning will
542                // fail for index items that already exist in the catalog.
543                stmt.if_not_exists = true;
544            } else {
545                sql_bail!(
546                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
547                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
548                );
549            }
550
551            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
552                sql_bail!("expected CreateIndexPlan plan");
553            };
554
555            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
556        }
557    };
558
559    Ok(explainee)
560}
561
562pub fn plan_explain_plan(
563    scx: &StatementContext,
564    explain: ExplainPlanStatement<Aug>,
565    params: &Params,
566) -> Result<Plan, PlanError> {
567    let format = match explain.format() {
568        mz_sql_parser::ast::ExplainFormat::Text => ExplainFormat::Text,
569        mz_sql_parser::ast::ExplainFormat::VerboseText => ExplainFormat::VerboseText,
570        mz_sql_parser::ast::ExplainFormat::Json => ExplainFormat::Json,
571        mz_sql_parser::ast::ExplainFormat::Dot => ExplainFormat::Dot,
572    };
573    let stage = explain.stage();
574
575    // Plan ExplainConfig.
576    let config = {
577        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
578
579        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
580            // If filtering is disabled, explain plans should not include pushdown info.
581            with_options.filter_pushdown = Some(false);
582        }
583
584        ExplainConfig::try_from(with_options)?
585    };
586
587    let explainee = plan_explainee(scx, explain.explainee, params)?;
588
589    Ok(Plan::ExplainPlan(ExplainPlanPlan {
590        stage,
591        format,
592        config,
593        explainee,
594    }))
595}
596
597pub fn plan_explain_schema(
598    scx: &StatementContext,
599    explain_schema: ExplainSinkSchemaStatement<Aug>,
600) -> Result<Plan, PlanError> {
601    let ExplainSinkSchemaStatement {
602        schema_for,
603        // Parser limits to JSON.
604        format: _,
605        mut statement,
606    } = explain_schema;
607
608    // Force the sink's name to one that's guaranteed not to exist, by virtue of
609    // being a non-existent item in a schema under the system's control, so that
610    // `plan_create_sink` doesn't complain about the name already existing.
611    statement.name = Some(UnresolvedItemName::qualified(&[
612        ident!("mz_catalog"),
613        ident!("mz_explain_schema"),
614    ]));
615
616    crate::pure::purify_create_sink_avro_doc_on_options(
617        scx.catalog,
618        *statement.from.item_id(),
619        &mut statement.format,
620    )?;
621
622    match ddl::plan_create_sink(scx, statement)? {
623        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
624            StorageSinkConnection::Kafka(KafkaSinkConnection {
625                format:
626                    KafkaSinkFormat {
627                        key_format,
628                        value_format:
629                            KafkaSinkFormatType::Avro {
630                                schema: value_schema,
631                                ..
632                            },
633                        ..
634                    },
635                ..
636            }) => {
637                let schema = match schema_for {
638                    ExplainSinkSchemaFor::Key => key_format
639                        .and_then(|f| match f {
640                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
641                            _ => None,
642                        })
643                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
644                    ExplainSinkSchemaFor::Value => value_schema,
645                };
646
647                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
648                    sink_from: sink.from,
649                    json_schema: schema,
650                }))
651            }
652            _ => bail_unsupported!(
653                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
654            ),
655        },
656        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
657    }
658}
659
660pub fn plan_explain_pushdown(
661    scx: &StatementContext,
662    statement: ExplainPushdownStatement<Aug>,
663    params: &Params,
664) -> Result<Plan, PlanError> {
665    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
666    let explainee = plan_explainee(scx, statement.explainee, params)?;
667    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
668}
669
670pub fn plan_explain_timestamp(
671    scx: &StatementContext,
672    explain: ExplainTimestampStatement<Aug>,
673    params: &Params,
674) -> Result<Plan, PlanError> {
675    let format = match explain.format() {
676        mz_sql_parser::ast::ExplainFormat::Text => ExplainFormat::Text,
677        mz_sql_parser::ast::ExplainFormat::VerboseText => ExplainFormat::VerboseText,
678        mz_sql_parser::ast::ExplainFormat::Json => ExplainFormat::Json,
679        mz_sql_parser::ast::ExplainFormat::Dot => ExplainFormat::Dot,
680    };
681
682    let raw_plan = {
683        let query::PlannedRootQuery {
684            expr: mut raw_plan,
685            desc: _,
686            finishing: _,
687            scope: _,
688        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
689        raw_plan.bind_parameters(params)?;
690
691        raw_plan
692    };
693    let when = query::plan_as_of(scx, explain.select.as_of)?;
694
695    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
696        format,
697        raw_plan,
698        when,
699    }))
700}
701
702/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
703/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
704#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
705pub fn plan_query(
706    scx: &StatementContext,
707    query: Query<Aug>,
708    params: &Params,
709    lifetime: QueryLifetime,
710) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
711    let query::PlannedRootQuery {
712        mut expr,
713        desc,
714        finishing,
715        scope,
716    } = query::plan_root_query(scx, query, lifetime)?;
717    expr.bind_parameters(params)?;
718
719    Ok(query::PlannedRootQuery {
720        // No metrics passed! One more reason not to use this deprecated function.
721        expr: expr.lower(scx.catalog.system_vars(), None)?,
722        desc,
723        finishing,
724        scope,
725    })
726}
727
728generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
729
730pub fn describe_subscribe(
731    scx: &StatementContext,
732    stmt: SubscribeStatement<Aug>,
733) -> Result<StatementDesc, PlanError> {
734    let relation_desc = match stmt.relation {
735        SubscribeRelation::Name(name) => {
736            let item = scx.get_item_by_resolved_name(&name)?;
737            item.desc(&scx.catalog.resolve_full_name(item.name()))?
738                .into_owned()
739        }
740        SubscribeRelation::Query(query) => {
741            let query::PlannedRootQuery { desc, .. } =
742                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
743            desc
744        }
745    };
746    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
747    let progress = progress.unwrap_or(false);
748    let mut desc = RelationDesc::builder().with_column(
749        "mz_timestamp",
750        ScalarType::Numeric {
751            max_scale: Some(NumericMaxScale::ZERO),
752        }
753        .nullable(false),
754    );
755    if progress {
756        desc = desc.with_column("mz_progressed", ScalarType::Bool.nullable(false));
757    }
758
759    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
760    match stmt.output {
761        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
762            desc = desc.with_column("mz_diff", ScalarType::Int64.nullable(true));
763            for (name, mut ty) in relation_desc.into_iter() {
764                if progress {
765                    ty.nullable = true;
766                }
767                desc = desc.with_column(name, ty);
768            }
769        }
770        SubscribeOutput::EnvelopeUpsert { key_columns }
771        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
772            desc = desc.with_column("mz_state", ScalarType::String.nullable(true));
773            let key_columns = key_columns
774                .into_iter()
775                .map(normalize::column_name)
776                .collect_vec();
777            let mut before_values_desc = RelationDesc::builder();
778            let mut after_values_desc = RelationDesc::builder();
779
780            // Add the key columns in the order that they're specified.
781            for column_name in &key_columns {
782                let mut column_ty = relation_desc
783                    .get_by_name(column_name)
784                    .map(|(_pos, ty)| ty.clone())
785                    .ok_or_else(|| PlanError::UnknownColumn {
786                        table: None,
787                        column: column_name.clone(),
788                        similar: Box::new([]),
789                    })?;
790                if progress {
791                    column_ty.nullable = true;
792                }
793                desc = desc.with_column(column_name, column_ty);
794            }
795
796            // Then add the remaining columns in the order from the original
797            // table, filtering out the key columns since we added those above.
798            for (mut name, mut ty) in relation_desc
799                .into_iter()
800                .filter(|(name, _ty)| !key_columns.contains(name))
801            {
802                ty.nullable = true;
803                before_values_desc =
804                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
805                if debezium {
806                    name = format!("after_{}", name).into();
807                }
808                after_values_desc = after_values_desc.with_column(name, ty);
809            }
810
811            if debezium {
812                desc = desc.concat(before_values_desc);
813            }
814            desc = desc.concat(after_values_desc);
815        }
816    }
817    Ok(StatementDesc::new(Some(desc.finish())))
818}
819
820pub fn plan_subscribe(
821    scx: &StatementContext,
822    SubscribeStatement {
823        relation,
824        options,
825        as_of,
826        up_to,
827        output,
828    }: SubscribeStatement<Aug>,
829    params: &Params,
830    copy_to: Option<CopyFormat>,
831) -> Result<Plan, PlanError> {
832    let (from, desc, scope) = match relation {
833        SubscribeRelation::Name(name) => {
834            let entry = scx.get_item_by_resolved_name(&name)?;
835            let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
836                Ok(desc) => desc,
837                Err(..) => sql_bail!(
838                    "'{}' cannot be subscribed to because it is a {}",
839                    name.full_name_str(),
840                    entry.item_type(),
841                ),
842            };
843            let item_name = match name {
844                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
845                _ => None,
846            };
847            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
848            (
849                SubscribeFrom::Id(entry.global_id()),
850                desc.into_owned(),
851                scope,
852            )
853        }
854        SubscribeRelation::Query(query) => {
855            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
856            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
857            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
858            // finishing should have already been turned into a `TopK` by
859            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
860            assert!(query.finishing.is_trivial(query.desc.arity()));
861            let desc = query.desc.clone();
862            (
863                SubscribeFrom::Query {
864                    expr: query.expr,
865                    desc: query.desc,
866                },
867                desc,
868                query.scope,
869            )
870        }
871    };
872
873    let when = query::plan_as_of(scx, as_of)?;
874    let up_to = up_to.map(|up_to| plan_up_to(scx, up_to)).transpose()?;
875
876    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
877    let ecx = ExprContext {
878        qcx: &qcx,
879        name: "",
880        scope: &scope,
881        relation_type: desc.typ(),
882        allow_aggregates: false,
883        allow_subqueries: true,
884        allow_parameters: true,
885        allow_windows: false,
886    };
887
888    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
889    let output = match output {
890        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
891        SubscribeOutput::EnvelopeUpsert { key_columns } => {
892            let order_by = key_columns
893                .iter()
894                .map(|ident| OrderByExpr {
895                    expr: Expr::Identifier(vec![ident.clone()]),
896                    asc: None,
897                    nulls_last: None,
898                })
899                .collect_vec();
900            let (order_by, map_exprs) = query::plan_order_by_exprs(
901                &ExprContext {
902                    name: "ENVELOPE UPSERT KEY clause",
903                    ..ecx
904                },
905                &order_by[..],
906                &output_columns[..],
907            )?;
908            if !map_exprs.is_empty() {
909                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
910            }
911            plan::SubscribeOutput::EnvelopeUpsert {
912                order_by_keys: order_by,
913            }
914        }
915        SubscribeOutput::EnvelopeDebezium { key_columns } => {
916            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
917            let order_by = key_columns
918                .iter()
919                .map(|ident| OrderByExpr {
920                    expr: Expr::Identifier(vec![ident.clone()]),
921                    asc: None,
922                    nulls_last: None,
923                })
924                .collect_vec();
925            let (order_by, map_exprs) = query::plan_order_by_exprs(
926                &ExprContext {
927                    name: "ENVELOPE DEBEZIUM KEY clause",
928                    ..ecx
929                },
930                &order_by[..],
931                &output_columns[..],
932            )?;
933            if !map_exprs.is_empty() {
934                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
935            }
936            plan::SubscribeOutput::EnvelopeDebezium {
937                order_by_keys: order_by,
938            }
939        }
940        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
941            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
942            let mz_diff = "mz_diff".into();
943            let output_columns = std::iter::once((0, &mz_diff))
944                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
945                .collect_vec();
946            match query::plan_order_by_exprs(
947                &ExprContext {
948                    name: "WITHIN TIMESTAMP ORDER BY clause",
949                    ..ecx
950                },
951                &order_by[..],
952                &output_columns[..],
953            ) {
954                Err(PlanError::UnknownColumn {
955                    table: None,
956                    column,
957                    similar: _,
958                }) if &column == &mz_diff => {
959                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
960                    // it looks like an unknown column. Instead, return a better error
961                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
962                }
963                Err(e) => return Err(e),
964                Ok((order_by, map_exprs)) => {
965                    if !map_exprs.is_empty() {
966                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
967                    }
968
969                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
970                }
971            }
972        }
973    };
974
975    let SubscribeOptionExtracted {
976        progress, snapshot, ..
977    } = options.try_into()?;
978    Ok(Plan::Subscribe(SubscribePlan {
979        from,
980        when,
981        up_to,
982        with_snapshot: snapshot.unwrap_or(true),
983        copy_to,
984        emit_progress: progress.unwrap_or(false),
985        output,
986    }))
987}
988
989pub fn describe_copy_from_table(
990    scx: &StatementContext,
991    table_name: <Aug as AstInfo>::ItemName,
992    columns: Vec<Ident>,
993) -> Result<StatementDesc, PlanError> {
994    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
995    Ok(StatementDesc::new(Some(desc)))
996}
997
998pub fn describe_copy_item(
999    scx: &StatementContext,
1000    object_name: <Aug as AstInfo>::ItemName,
1001    columns: Vec<Ident>,
1002) -> Result<StatementDesc, PlanError> {
1003    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1004    Ok(StatementDesc::new(Some(desc)))
1005}
1006
1007pub fn describe_copy(
1008    scx: &StatementContext,
1009    CopyStatement {
1010        relation,
1011        direction,
1012        ..
1013    }: CopyStatement<Aug>,
1014) -> Result<StatementDesc, PlanError> {
1015    Ok(match (relation, direction) {
1016        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1017            describe_copy_item(scx, name, columns)?
1018        }
1019        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1020            describe_copy_from_table(scx, name, columns)?
1021        }
1022        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1023        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1024    }
1025    .with_is_copy())
1026}
1027
1028fn plan_copy_to_expr(
1029    scx: &StatementContext,
1030    select_plan: SelectPlan,
1031    desc: RelationDesc,
1032    to: &Expr<Aug>,
1033    format: CopyFormat,
1034    options: CopyOptionExtracted,
1035) -> Result<Plan, PlanError> {
1036    let conn_id = match options.aws_connection {
1037        Some(conn_id) => CatalogItemId::from(conn_id),
1038        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1039    };
1040    let connection = scx.get_item(&conn_id).connection()?;
1041
1042    match connection {
1043        mz_storage_types::connections::Connection::Aws(_) => {}
1044        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1045    }
1046
1047    let format = match format {
1048        CopyFormat::Csv => {
1049            let quote = extract_byte_param_value(options.quote, "quote")?;
1050            let escape = extract_byte_param_value(options.escape, "escape")?;
1051            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1052            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1053                CopyCsvFormatParams::try_new(
1054                    delimiter,
1055                    quote,
1056                    escape,
1057                    options.header,
1058                    options.null,
1059                )
1060                .map_err(|e| sql_err!("{}", e))?,
1061            ))
1062        }
1063        CopyFormat::Parquet => {
1064            // Validate that the output desc can be formatted as parquet
1065            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1066            S3SinkFormat::Parquet
1067        }
1068        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1069        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1070    };
1071
1072    // Converting the to expr to a HirScalarExpr
1073    let mut to_expr = to.clone();
1074    transform_ast::transform(scx, &mut to_expr)?;
1075    let relation_type = RelationDesc::empty();
1076    let ecx = &ExprContext {
1077        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1078        name: "COPY TO target",
1079        scope: &Scope::empty(),
1080        relation_type: relation_type.typ(),
1081        allow_aggregates: false,
1082        allow_subqueries: false,
1083        allow_parameters: false,
1084        allow_windows: false,
1085    };
1086
1087    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &ScalarType::String)?;
1088
1089    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1090        sql_bail!(
1091            "MAX FILE SIZE cannot be less than {}",
1092            MIN_S3_SINK_FILE_SIZE
1093        );
1094    }
1095    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1096        sql_bail!(
1097            "MAX FILE SIZE cannot be greater than {}",
1098            MAX_S3_SINK_FILE_SIZE
1099        );
1100    }
1101
1102    Ok(Plan::CopyTo(CopyToPlan {
1103        select_plan,
1104        desc,
1105        to,
1106        connection: connection.to_owned(),
1107        connection_id: conn_id,
1108        format,
1109        max_file_size: options.max_file_size.as_bytes(),
1110    }))
1111}
1112
1113fn plan_copy_from(
1114    scx: &StatementContext,
1115    target: &CopyTarget<Aug>,
1116    table_name: ResolvedItemName,
1117    columns: Vec<Ident>,
1118    format: CopyFormat,
1119    options: CopyOptionExtracted,
1120) -> Result<Plan, PlanError> {
1121    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1122        match option {
1123            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1124            None => Ok(()),
1125        }
1126    }
1127
1128    let source = match target {
1129        CopyTarget::Stdin => CopyFromSource::Stdin,
1130        CopyTarget::Expr(from) => {
1131            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1132
1133            // Converting the expr to an HirScalarExpr
1134            let mut from_expr = from.clone();
1135            transform_ast::transform(scx, &mut from_expr)?;
1136            let relation_type = RelationDesc::empty();
1137            let ecx = &ExprContext {
1138                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1139                name: "COPY FROM target",
1140                scope: &Scope::empty(),
1141                relation_type: relation_type.typ(),
1142                allow_aggregates: false,
1143                allow_subqueries: false,
1144                allow_parameters: false,
1145                allow_windows: false,
1146            };
1147            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?;
1148
1149            match options.aws_connection {
1150                Some(conn_id) => {
1151                    let conn_id = CatalogItemId::from(conn_id);
1152
1153                    // Validate the connection type is one we expect.
1154                    let connection = match scx.get_item(&conn_id).connection()? {
1155                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1156                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1157                    };
1158
1159                    CopyFromSource::AwsS3 {
1160                        uri: from,
1161                        connection,
1162                        connection_id: conn_id,
1163                    }
1164                }
1165                None => CopyFromSource::Url(from),
1166            }
1167        }
1168        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1169    };
1170
1171    let params = match format {
1172        CopyFormat::Text => {
1173            only_available_with_csv(options.quote, "quote")?;
1174            only_available_with_csv(options.escape, "escape")?;
1175            only_available_with_csv(options.header, "HEADER")?;
1176            let delimiter =
1177                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1178            let null = match options.null {
1179                Some(null) => Cow::from(null),
1180                None => Cow::from("\\N"),
1181            };
1182            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1183        }
1184        CopyFormat::Csv => {
1185            let quote = extract_byte_param_value(options.quote, "quote")?;
1186            let escape = extract_byte_param_value(options.escape, "escape")?;
1187            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1188            CopyFormatParams::Csv(
1189                CopyCsvFormatParams::try_new(
1190                    delimiter,
1191                    quote,
1192                    escape,
1193                    options.header,
1194                    options.null,
1195                )
1196                .map_err(|e| sql_err!("{}", e))?,
1197            )
1198        }
1199        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1200        CopyFormat::Parquet => CopyFormatParams::Parquet,
1201    };
1202
1203    let filter = match (options.files, options.pattern) {
1204        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1205        (Some(files), None) => Some(CopyFromFilter::Files(files)),
1206        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1207        (None, None) => None,
1208    };
1209
1210    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1211        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1212    }
1213
1214    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1215
1216    let Some(mfp) = maybe_mfp else {
1217        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1218    };
1219
1220    Ok(Plan::CopyFrom(CopyFromPlan {
1221        id,
1222        source,
1223        columns,
1224        source_desc,
1225        mfp,
1226        params,
1227        filter,
1228    }))
1229}
1230
1231fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1232    match v {
1233        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1234        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1235        None => Ok(None),
1236    }
1237}
1238
1239generate_extracted_config!(
1240    CopyOption,
1241    (Format, String),
1242    (Delimiter, String),
1243    (Null, String),
1244    (Escape, String),
1245    (Quote, String),
1246    (Header, bool),
1247    (AwsConnection, with_options::Object),
1248    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1249    (Files, Vec<String>),
1250    (Pattern, String)
1251);
1252
1253pub fn plan_copy(
1254    scx: &StatementContext,
1255    CopyStatement {
1256        relation,
1257        direction,
1258        target,
1259        options,
1260    }: CopyStatement<Aug>,
1261) -> Result<Plan, PlanError> {
1262    let options = CopyOptionExtracted::try_from(options)?;
1263    // Parse any user-provided FORMAT option. If not provided, will default to
1264    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
1265    let format = options
1266        .format
1267        .as_ref()
1268        .map(|format| match format.to_lowercase().as_str() {
1269            "text" => Ok(CopyFormat::Text),
1270            "csv" => Ok(CopyFormat::Csv),
1271            "binary" => Ok(CopyFormat::Binary),
1272            "parquet" => Ok(CopyFormat::Parquet),
1273            _ => sql_bail!("unknown FORMAT: {}", format),
1274        })
1275        .transpose()?;
1276
1277    match (&direction, &target) {
1278        (CopyDirection::To, CopyTarget::Stdout) => {
1279            if options.delimiter.is_some() {
1280                sql_bail!("COPY TO does not support DELIMITER option yet");
1281            }
1282            if options.quote.is_some() {
1283                sql_bail!("COPY TO does not support QUOTE option yet");
1284            }
1285            if options.null.is_some() {
1286                sql_bail!("COPY TO does not support NULL option yet");
1287            }
1288            match relation {
1289                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1290                CopyRelation::Select(stmt) => Ok(plan_select(
1291                    scx,
1292                    stmt,
1293                    &Params::empty(),
1294                    Some(format.unwrap_or(CopyFormat::Text)),
1295                )?),
1296                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1297                    scx,
1298                    stmt,
1299                    &Params::empty(),
1300                    Some(format.unwrap_or(CopyFormat::Text)),
1301                )?),
1302            }
1303        }
1304        (CopyDirection::From, target) => match relation {
1305            CopyRelation::Named { name, columns } => plan_copy_from(
1306                scx,
1307                target,
1308                name,
1309                columns,
1310                format.unwrap_or(CopyFormat::Text),
1311                options,
1312            ),
1313            _ => sql_bail!("COPY FROM {} not supported", target),
1314        },
1315        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1316            // System users are always allowed to use this feature, even when
1317            // the flag is disabled, so that we can dogfood for analytics in
1318            // production environments. The feature is stable enough that we're
1319            // not worried about it crashing.
1320            if !scx.catalog.active_role_id().is_system() {
1321                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1322            }
1323
1324            let format = match format {
1325                Some(inner) => inner,
1326                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1327            };
1328
1329            let stmt = match relation {
1330                CopyRelation::Named { name, columns } => {
1331                    if !columns.is_empty() {
1332                        // TODO(mouli): Add support for this
1333                        sql_bail!(
1334                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1335                        );
1336                    }
1337                    // Generate a synthetic SELECT query that just gets the table
1338                    let query = Query {
1339                        ctes: CteBlock::empty(),
1340                        body: SetExpr::Table(name),
1341                        order_by: vec![],
1342                        limit: None,
1343                        offset: None,
1344                    };
1345                    SelectStatement { query, as_of: None }
1346                }
1347                CopyRelation::Select(stmt) => {
1348                    if !stmt.query.order_by.is_empty() {
1349                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1350                    }
1351                    stmt
1352                }
1353                _ => sql_bail!("COPY {} {} not supported", direction, target),
1354            };
1355
1356            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1357            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1358        }
1359        _ => sql_bail!("COPY {} {} not supported", direction, target),
1360    }
1361}