mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37    SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50    UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
73// plans the whole query to determine its shape and parameter types,
74// and then throws away that plan. If we were smarter, we'd stash that
75// plan somewhere so we don't have to recompute it when the query is
76// executed.
77
78pub fn describe_insert(
79    scx: &StatementContext,
80    InsertStatement {
81        table_name,
82        columns,
83        source,
84        returning,
85    }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88    let desc = if returning.expr.is_empty() {
89        None
90    } else {
91        Some(returning.desc)
92    };
93    Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97    scx: &StatementContext,
98    InsertStatement {
99        table_name,
100        columns,
101        source,
102        returning,
103    }: InsertStatement<Aug>,
104    params: &Params,
105) -> Result<Plan, PlanError> {
106    let (id, mut expr, returning) =
107        query::plan_insert_query(scx, table_name, columns, source, returning)?;
108    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109    let returning = returning
110        .expr
111        .into_iter()
112        .map(|mut expr| {
113            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114            expr.lower_uncorrelated()
115        })
116        .collect::<Result<Vec<_>, _>>()?;
117
118    Ok(Plan::Insert(InsertPlan {
119        id,
120        values: expr,
121        returning,
122    }))
123}
124
125pub fn describe_delete(
126    scx: &StatementContext,
127    stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129    query::plan_delete_query(scx, stmt)?;
130    Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134    scx: &StatementContext,
135    stmt: DeleteStatement<Aug>,
136    params: &Params,
137) -> Result<Plan, PlanError> {
138    let rtw_plan = query::plan_delete_query(scx, stmt)?;
139    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143    scx: &StatementContext,
144    stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146    query::plan_update_query(scx, stmt)?;
147    Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151    scx: &StatementContext,
152    stmt: UpdateStatement<Aug>,
153    params: &Params,
154) -> Result<Plan, PlanError> {
155    let rtw_plan = query::plan_update_query(scx, stmt)?;
156    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160    scx: &StatementContext,
161    kind: MutationKind,
162    params: &Params,
163    query::ReadThenWritePlan {
164        id,
165        mut selection,
166        finishing,
167        assignments,
168    }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171    let mut assignments_outer = BTreeMap::new();
172    for (idx, mut set) in assignments {
173        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174        let set = set.lower_uncorrelated()?;
175        assignments_outer.insert(idx, set);
176    }
177
178    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179        id,
180        selection,
181        finishing,
182        assignments: assignments_outer,
183        kind,
184        returning: Vec::new(),
185    }))
186}
187
188pub fn describe_select(
189    scx: &StatementContext,
190    stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193        return Ok(StatementDesc::new(Some(desc)));
194    }
195
196    let query::PlannedRootQuery { desc, .. } =
197        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198    Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202    scx: &StatementContext,
203    select: SelectStatement<Aug>,
204    params: &Params,
205    copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208        return Ok(Plan::SideEffectingFunc(f));
209    }
210
211    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212    Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216    scx: &StatementContext,
217    select: SelectStatement<Aug>,
218    params: &Params,
219    copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221    let when = query::plan_as_of(scx, select.as_of.clone())?;
222    let lifetime = QueryLifetime::OneShot;
223    let query::PlannedRootQuery {
224        mut expr,
225        desc,
226        finishing,
227        scope: _,
228    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229    expr.bind_parameters(scx, lifetime, params)?;
230
231    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
232    // Let's check this and simplify them to literals.
233    expr.try_visit_mut_pre(&mut |expr| {
234        if let HirRelationExpr::TopK { offset, .. } = expr {
235            let offset_value = offset_into_value(offset.take())?;
236            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237        }
238        Ok::<(), PlanError>(())
239    })?;
240    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
241    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
242    // so later.)
243
244    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
245    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
246    // This involves binding parameters and evaluating each expression to a number.
247    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
248    // so this `limit` has to be a constant.)
249    let limit = match finishing.limit {
250        None => None,
251        Some(mut limit) => {
252            limit.bind_parameters(scx, lifetime, params)?;
253            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
254            let Some(limit) = limit.as_literal() else {
255                sql_bail!(
256                    "Top-level LIMIT must be a constant expression, got {}",
257                    limit
258                )
259            };
260            match limit {
261                Datum::Null => None,
262                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263                _ => {
264                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265                    sql_bail!("LIMIT must be a non-negative INT or NULL")
266                }
267            }
268        }
269    };
270    let offset = {
271        let mut offset = finishing.offset.clone();
272        offset.bind_parameters(scx, lifetime, params)?;
273        let offset = offset_into_value(offset.take())?;
274        offset
275            .try_into()
276            .expect("checked in offset_into_value that it is not negative")
277    };
278
279    let plan = SelectPlan {
280        source: expr,
281        when,
282        finishing: RowSetFinishing {
283            limit,
284            offset,
285            project: finishing.project,
286            order_by: finishing.order_by,
287        },
288        copy_to,
289        select: Some(Box::new(select)),
290    };
291
292    Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296    scx: &StatementContext,
297    explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299    let mut relation_desc = RelationDesc::builder();
300
301    match explain.stage() {
302        ExplainStage::RawPlan => {
303            let name = "Raw Plan";
304            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305        }
306        ExplainStage::DecorrelatedPlan => {
307            let name = "Decorrelated Plan";
308            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309        }
310        ExplainStage::LocalPlan => {
311            let name = "Locally Optimized Plan";
312            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313        }
314        ExplainStage::GlobalPlan => {
315            let name = "Optimized Plan";
316            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317        }
318        ExplainStage::PhysicalPlan => {
319            let name = "Physical Plan";
320            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321        }
322        ExplainStage::Trace => {
323            relation_desc = relation_desc
324                .with_column("Time", SqlScalarType::UInt64.nullable(false))
325                .with_column("Path", SqlScalarType::String.nullable(false))
326                .with_column("Plan", SqlScalarType::String.nullable(false));
327        }
328        ExplainStage::PlanInsights => {
329            let name = "Plan Insights";
330            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331        }
332    };
333    let relation_desc = relation_desc.finish();
334
335    Ok(
336        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338            _ => vec![],
339        }),
340    )
341}
342
343pub fn describe_explain_pushdown(
344    scx: &StatementContext,
345    statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347    let relation_desc = RelationDesc::builder()
348        .with_column("Source", SqlScalarType::String.nullable(false))
349        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353        .finish();
354
355    Ok(
356        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358            _ => vec![],
359        }),
360    )
361}
362
363pub fn describe_explain_analyze_object(
364    _scx: &StatementContext,
365    statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367    if statement.as_sql {
368        let relation_desc = RelationDesc::builder()
369            .with_column("SQL", SqlScalarType::String.nullable(false))
370            .finish();
371        return Ok(StatementDesc::new(Some(relation_desc)));
372    }
373
374    match statement.properties {
375        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376            properties,
377            skew,
378        }) => {
379            let mut relation_desc = RelationDesc::builder()
380                .with_column("operator", SqlScalarType::String.nullable(false));
381
382            if skew {
383                relation_desc =
384                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385            }
386
387            let mut seen_properties = BTreeSet::new();
388            for property in properties {
389                // handle each property only once (belt and suspenders)
390                if !seen_properties.insert(property) {
391                    continue;
392                }
393
394                match property {
395                    ExplainAnalyzeComputationProperty::Memory if skew => {
396                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397                        relation_desc = relation_desc
398                            .with_column("memory_ratio", numeric.clone())
399                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
400                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
401                            .with_column("total_memory", SqlScalarType::String.nullable(true))
402                            .with_column("records_ratio", numeric.clone())
403                            .with_column("worker_records", numeric.clone())
404                            .with_column("avg_records", numeric.clone())
405                            .with_column("total_records", numeric);
406                    }
407                    ExplainAnalyzeComputationProperty::Memory => {
408                        relation_desc = relation_desc
409                            .with_column("total_memory", SqlScalarType::String.nullable(true))
410                            .with_column(
411                                "total_records",
412                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
413                            );
414                    }
415                    ExplainAnalyzeComputationProperty::Cpu => {
416                        if skew {
417                            relation_desc = relation_desc
418                                .with_column(
419                                    "cpu_ratio",
420                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
421                                )
422                                .with_column(
423                                    "worker_elapsed",
424                                    SqlScalarType::Interval.nullable(true),
425                                )
426                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427                        }
428                        relation_desc = relation_desc
429                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430                    }
431                }
432            }
433
434            let relation_desc = relation_desc.finish();
435            Ok(StatementDesc::new(Some(relation_desc)))
436        }
437        ExplainAnalyzeProperty::Hints => {
438            let relation_desc = RelationDesc::builder()
439                .with_column("operator", SqlScalarType::String.nullable(true))
440                .with_column("levels", SqlScalarType::Int64.nullable(true))
441                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442                .with_column("hint", SqlScalarType::Float64.nullable(true))
443                .with_column("savings", SqlScalarType::String.nullable(true))
444                .finish();
445            Ok(StatementDesc::new(Some(relation_desc)))
446        }
447    }
448}
449
450pub fn describe_explain_analyze_cluster(
451    _scx: &StatementContext,
452    statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454    if statement.as_sql {
455        let relation_desc = RelationDesc::builder()
456            .with_column("SQL", SqlScalarType::String.nullable(false))
457            .finish();
458        return Ok(StatementDesc::new(Some(relation_desc)));
459    }
460
461    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463    let mut relation_desc = RelationDesc::builder()
464        .with_column("object", SqlScalarType::String.nullable(false))
465        .with_column("global_id", SqlScalarType::String.nullable(false));
466
467    if skew {
468        relation_desc =
469            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470    }
471
472    let mut seen_properties = BTreeSet::new();
473    for property in properties {
474        // handle each property only once (belt and suspenders)
475        if !seen_properties.insert(property) {
476            continue;
477        }
478
479        match property {
480            ExplainAnalyzeComputationProperty::Memory if skew => {
481                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482                relation_desc = relation_desc
483                    .with_column("max_operator_memory_ratio", numeric.clone())
484                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
485                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
486                    .with_column("total_memory", SqlScalarType::String.nullable(true))
487                    .with_column("max_operator_records_ratio", numeric.clone())
488                    .with_column("worker_records", numeric.clone())
489                    .with_column("avg_records", numeric.clone())
490                    .with_column("total_records", numeric);
491            }
492            ExplainAnalyzeComputationProperty::Memory => {
493                relation_desc = relation_desc
494                    .with_column("total_memory", SqlScalarType::String.nullable(true))
495                    .with_column(
496                        "total_records",
497                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
498                    );
499            }
500            ExplainAnalyzeComputationProperty::Cpu if skew => {
501                relation_desc = relation_desc
502                    .with_column(
503                        "max_operator_cpu_ratio",
504                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
505                    )
506                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509            }
510            ExplainAnalyzeComputationProperty::Cpu => {
511                relation_desc = relation_desc
512                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513            }
514        }
515    }
516
517    Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521    scx: &StatementContext,
522    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524    let relation_desc = RelationDesc::builder()
525        .with_column("Timestamp", SqlScalarType::String.nullable(false))
526        .finish();
527
528    Ok(StatementDesc::new(Some(relation_desc))
529        .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533    _: &StatementContext,
534    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536    let relation_desc = RelationDesc::builder()
537        .with_column("Schema", SqlScalarType::String.nullable(false))
538        .finish();
539    Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
543// `bool`:
544// - When it's an override of a global feature flag, for example optimizer feature flags. In this
545//   case, we need not just false and true, but also None to say "take the value of the global
546//   flag".
547// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
548//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
549//   but otherwise we'd like to default to false.
550generate_extracted_config!(
551    ExplainPlanOption,
552    (Arity, Option<bool>, Default(None)),
553    (Cardinality, bool, Default(false)),
554    (ColumnNames, bool, Default(false)),
555    (FilterPushdown, Option<bool>, Default(None)),
556    (HumanizedExpressions, Option<bool>, Default(None)),
557    (JoinImplementations, bool, Default(false)),
558    (Keys, bool, Default(false)),
559    (LinearChains, bool, Default(false)),
560    (NoFastPath, bool, Default(false)),
561    (NonNegative, bool, Default(false)),
562    (NoNotices, bool, Default(false)),
563    (NodeIdentifiers, bool, Default(false)),
564    (Raw, bool, Default(false)),
565    (RawPlans, bool, Default(false)),
566    (RawSyntax, bool, Default(false)),
567    (Redacted, bool, Default(false)),
568    (SubtreeSize, bool, Default(false)),
569    (Timing, bool, Default(false)),
570    (Types, bool, Default(false)),
571    (Equivalences, bool, Default(false)),
572    (ReoptimizeImportedViews, Option<bool>, Default(None)),
573    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578    (
579        EnableProjectionPushdownAfterRelationCse,
580        Option<bool>,
581        Default(None)
582    )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586    type Error = PlanError;
587
588    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589        // If `WITH(raw)` is specified, ensure that the config will be as
590        // representative for the original plan as possible.
591        if v.raw {
592            v.raw_plans = true;
593            v.raw_syntax = true;
594        }
595
596        // Certain config should default to be enabled in release builds running on
597        // staging or prod (where SOFT_ASSERTIONS are turned off).
598        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600        Ok(ExplainConfig {
601            arity: v.arity.unwrap_or(enable_on_prod),
602            cardinality: v.cardinality,
603            column_names: v.column_names,
604            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606            join_impls: v.join_implementations,
607            keys: v.keys,
608            linear_chains: !v.raw_plans && v.linear_chains,
609            no_fast_path: v.no_fast_path,
610            no_notices: v.no_notices,
611            node_ids: v.node_identifiers,
612            non_negative: v.non_negative,
613            raw_plans: v.raw_plans,
614            raw_syntax: v.raw_syntax,
615            verbose_syntax: false,
616            redacted: v.redacted,
617            subtree_size: v.subtree_size,
618            equivalences: v.equivalences,
619            timing: v.timing,
620            types: v.types,
621            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
622            features: OptimizerFeatureOverrides {
623                enable_guard_subquery_tablefunc: Default::default(),
624                enable_eager_delta_joins: v.enable_eager_delta_joins,
625                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628                enable_consolidate_after_union_negate: Default::default(),
629                enable_reduce_mfp_fusion: Default::default(),
630                enable_cardinality_estimates: Default::default(),
631                persist_fast_path_limit: Default::default(),
632                reoptimize_imported_views: v.reoptimize_imported_views,
633                enable_reduce_reduction: Default::default(),
634                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
635                enable_projection_pushdown_after_relation_cse: v
636                    .enable_projection_pushdown_after_relation_cse,
637                enable_less_reduce_in_eqprop: Default::default(),
638                enable_dequadratic_eqprop_map: Default::default(),
639                enable_eq_classes_withholding_errors: Default::default(),
640                enable_fast_path_plan_insights: Default::default(),
641                enable_repr_typecheck: Default::default(),
642            },
643        })
644    }
645}
646
647fn plan_explainee(
648    scx: &StatementContext,
649    explainee: Explainee<Aug>,
650    params: &Params,
651) -> Result<plan::Explainee, PlanError> {
652    use crate::plan::ExplaineeStatement;
653
654    let is_replan = matches!(
655        explainee,
656        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
657    );
658
659    let explainee = match explainee {
660        Explainee::View(name) | Explainee::ReplanView(name) => {
661            let item = scx.get_item_by_resolved_name(&name)?;
662            let item_type = item.item_type();
663            if item_type != CatalogItemType::View {
664                sql_bail!("Expected {name} to be a view, not a {item_type}");
665            }
666            match is_replan {
667                true => crate::plan::Explainee::ReplanView(item.id()),
668                false => crate::plan::Explainee::View(item.id()),
669            }
670        }
671        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
672            let item = scx.get_item_by_resolved_name(&name)?;
673            let item_type = item.item_type();
674            if item_type != CatalogItemType::MaterializedView {
675                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
676            }
677            match is_replan {
678                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
679                false => crate::plan::Explainee::MaterializedView(item.id()),
680            }
681        }
682        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
683            let item = scx.get_item_by_resolved_name(&name)?;
684            let item_type = item.item_type();
685            if item_type != CatalogItemType::Index {
686                sql_bail!("Expected {name} to be an index, not a {item_type}");
687            }
688            match is_replan {
689                true => crate::plan::Explainee::ReplanIndex(item.id()),
690                false => crate::plan::Explainee::Index(item.id()),
691            }
692        }
693        Explainee::Select(select, broken) => {
694            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
695            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
696        }
697        Explainee::CreateView(mut stmt, broken) => {
698            if stmt.if_exists != IfExistsBehavior::Skip {
699                // If we don't force this parameter to Skip planning will
700                // fail for names that already exist in the catalog. This
701                // can happen even in `Replace` mode if the existing item
702                // has dependencies.
703                stmt.if_exists = IfExistsBehavior::Skip;
704            } else {
705                sql_bail!(
706                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
707                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
708                );
709            }
710
711            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
712                sql_bail!("expected CreateViewPlan plan");
713            };
714
715            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
716        }
717        Explainee::CreateMaterializedView(mut stmt, broken) => {
718            if stmt.if_exists != IfExistsBehavior::Skip {
719                // If we don't force this parameter to Skip planning will
720                // fail for names that already exist in the catalog. This
721                // can happen even in `Replace` mode if the existing item
722                // has dependencies.
723                stmt.if_exists = IfExistsBehavior::Skip;
724            } else {
725                sql_bail!(
726                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
727                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
728                );
729            }
730
731            let Plan::CreateMaterializedView(plan) =
732                ddl::plan_create_materialized_view(scx, *stmt)?
733            else {
734                sql_bail!("expected CreateMaterializedViewPlan plan");
735            };
736
737            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
738                broken,
739                plan,
740            })
741        }
742        Explainee::CreateIndex(mut stmt, broken) => {
743            if !stmt.if_not_exists {
744                // If we don't force this parameter to true planning will
745                // fail for index items that already exist in the catalog.
746                stmt.if_not_exists = true;
747            } else {
748                sql_bail!(
749                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
750                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
751                );
752            }
753
754            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
755                sql_bail!("expected CreateIndexPlan plan");
756            };
757
758            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
759        }
760    };
761
762    Ok(explainee)
763}
764
765pub fn plan_explain_plan(
766    scx: &StatementContext,
767    explain: ExplainPlanStatement<Aug>,
768    params: &Params,
769) -> Result<Plan, PlanError> {
770    let (format, verbose_syntax) = match explain.format() {
771        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
772        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
773        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
774        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
775    };
776    let stage = explain.stage();
777
778    // Plan ExplainConfig.
779    let mut config = {
780        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
781
782        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
783            // If filtering is disabled, explain plans should not include pushdown info.
784            with_options.filter_pushdown = Some(false);
785        }
786
787        ExplainConfig::try_from(with_options)?
788    };
789    config.verbose_syntax = verbose_syntax;
790
791    let explainee = plan_explainee(scx, explain.explainee, params)?;
792
793    Ok(Plan::ExplainPlan(ExplainPlanPlan {
794        stage,
795        format,
796        config,
797        explainee,
798    }))
799}
800
801pub fn plan_explain_schema(
802    scx: &StatementContext,
803    explain_schema: ExplainSinkSchemaStatement<Aug>,
804) -> Result<Plan, PlanError> {
805    let ExplainSinkSchemaStatement {
806        schema_for,
807        // Parser limits to JSON.
808        format: _,
809        mut statement,
810    } = explain_schema;
811
812    // Force the sink's name to one that's guaranteed not to exist, by virtue of
813    // being a non-existent item in a schema under the system's control, so that
814    // `plan_create_sink` doesn't complain about the name already existing.
815    statement.name = Some(UnresolvedItemName::qualified(&[
816        ident!("mz_catalog"),
817        ident!("mz_explain_schema"),
818    ]));
819
820    crate::pure::purify_create_sink_avro_doc_on_options(
821        scx.catalog,
822        *statement.from.item_id(),
823        &mut statement.format,
824    )?;
825
826    match ddl::plan_create_sink(scx, statement)? {
827        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
828            StorageSinkConnection::Kafka(KafkaSinkConnection {
829                format:
830                    KafkaSinkFormat {
831                        key_format,
832                        value_format:
833                            KafkaSinkFormatType::Avro {
834                                schema: value_schema,
835                                ..
836                            },
837                        ..
838                    },
839                ..
840            }) => {
841                let schema = match schema_for {
842                    ExplainSinkSchemaFor::Key => key_format
843                        .and_then(|f| match f {
844                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
845                            _ => None,
846                        })
847                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
848                    ExplainSinkSchemaFor::Value => value_schema,
849                };
850
851                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
852                    sink_from: sink.from,
853                    json_schema: schema,
854                }))
855            }
856            _ => bail_unsupported!(
857                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
858            ),
859        },
860        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
861    }
862}
863
864pub fn plan_explain_pushdown(
865    scx: &StatementContext,
866    statement: ExplainPushdownStatement<Aug>,
867    params: &Params,
868) -> Result<Plan, PlanError> {
869    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
870    let explainee = plan_explainee(scx, statement.explainee, params)?;
871    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
872}
873
874pub fn plan_explain_analyze_object(
875    scx: &StatementContext,
876    statement: ExplainAnalyzeObjectStatement<Aug>,
877    params: &Params,
878) -> Result<Plan, PlanError> {
879    let explainee_name = statement
880        .explainee
881        .name()
882        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
883        .full_name_str();
884    let explainee = plan_explainee(scx, statement.explainee, params)?;
885
886    match explainee {
887        plan::Explainee::Index(_index_id) => (),
888        plan::Explainee::MaterializedView(_item_id) => (),
889        _ => {
890            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
891        }
892    };
893
894    // generate SQL query
895
896    /* WITH {CTEs}
897       SELECT REPEAT(' ', nesting * 2) || operator AS operator
898             {columns}
899        FROM      mz_introspection.mz_lir_mapping mlm
900             JOIN {from} USING (lir_id)
901             JOIN mz_introspection.mz_mappable_objects mo
902               ON (mlm.global_id = mo.global_id)
903       WHERE     mo.name = '{plan.explainee_name}'
904             AND {predicates}
905       ORDER BY lir_id DESC
906    */
907    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
908    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
909    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
910    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
911    let mut order_by = vec!["mlm.lir_id DESC"];
912
913    match statement.properties {
914        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
915            properties,
916            skew,
917        }) => {
918            let mut worker_id = None;
919            let mut seen_properties = BTreeSet::new();
920            for property in properties {
921                // handle each property only once (belt and suspenders)
922                if !seen_properties.insert(property) {
923                    continue;
924                }
925
926                match property {
927                    ExplainAnalyzeComputationProperty::Memory => {
928                        ctes.push((
929                            "summary_memory",
930                            r#"
931  SELECT mlm.global_id AS global_id,
932         mlm.lir_id AS lir_id,
933         SUM(mas.size) AS total_memory,
934         SUM(mas.records) AS total_records,
935         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
936         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
937    FROM            mz_introspection.mz_lir_mapping mlm
938         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
939               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
940                 ON (mas.operator_id = valid_id)
941GROUP BY mlm.global_id, mlm.lir_id"#,
942                        ));
943                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
944
945                        if skew {
946                            ctes.push((
947                                "per_worker_memory",
948                                r#"
949  SELECT mlm.global_id AS global_id,
950         mlm.lir_id AS lir_id,
951         mas.worker_id AS worker_id,
952         SUM(mas.size) AS worker_memory,
953         SUM(mas.records) AS worker_records
954    FROM            mz_introspection.mz_lir_mapping mlm
955         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957                 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
959                            ));
960                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
961
962                            if let Some(worker_id) = worker_id {
963                                predicates.push(format!("pwm.worker_id = {worker_id}"));
964                            } else {
965                                worker_id = Some("pwm.worker_id");
966                                columns.push("pwm.worker_id AS worker_id");
967                                order_by.push("worker_id");
968                            }
969
970                            columns.extend([
971                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
972                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
973                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
974                                "pg_size_pretty(sm.total_memory) AS total_memory",
975                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
976                                "pwm.worker_records AS worker_records",
977                                "sm.avg_records AS avg_records",
978                                "sm.total_records AS total_records",
979                            ]);
980                        } else {
981                            columns.extend([
982                                "pg_size_pretty(sm.total_memory) AS total_memory",
983                                "sm.total_records AS total_records",
984                            ]);
985                        }
986                    }
987                    ExplainAnalyzeComputationProperty::Cpu => {
988                        ctes.push((
989                            "summary_cpu",
990                            r#"
991  SELECT mlm.global_id AS global_id,
992         mlm.lir_id AS lir_id,
993         SUM(mse.elapsed_ns) AS total_ns,
994         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
995    FROM            mz_introspection.mz_lir_mapping mlm
996         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
997               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
998                 ON (mse.id = valid_id)
999GROUP BY mlm.global_id, mlm.lir_id"#,
1000                        ));
1001                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1002
1003                        if skew {
1004                            ctes.push((
1005                                "per_worker_cpu",
1006                                r#"
1007  SELECT mlm.global_id AS global_id,
1008         mlm.lir_id AS lir_id,
1009         mse.worker_id AS worker_id,
1010         SUM(mse.elapsed_ns) AS worker_ns
1011    FROM            mz_introspection.mz_lir_mapping mlm
1012         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1013               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1014                 ON (mse.id = valid_id)
1015GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1016                            ));
1017                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1018
1019                            if let Some(worker_id) = worker_id {
1020                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1021                            } else {
1022                                worker_id = Some("pwc.worker_id");
1023                                columns.push("pwc.worker_id AS worker_id");
1024                                order_by.push("worker_id");
1025                            }
1026
1027                            columns.extend([
1028                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1029                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1030                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1031                            ]);
1032                        }
1033                        columns.push(
1034                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1035                        );
1036                    }
1037                }
1038            }
1039        }
1040        ExplainAnalyzeProperty::Hints => {
1041            columns.extend([
1042                "megsa.levels AS levels",
1043                "megsa.to_cut AS to_cut",
1044                "megsa.hint AS hint",
1045                "pg_size_pretty(megsa.savings) AS savings",
1046            ]);
1047            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1048            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1049             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1050        }
1051    }
1052
1053    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1054
1055    let ctes = if !ctes.is_empty() {
1056        format!(
1057            "WITH {}",
1058            separated(
1059                ",\n",
1060                ctes.iter()
1061                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1062            )
1063        )
1064    } else {
1065        String::new()
1066    };
1067    let columns = separated(", ", columns);
1068    let from = separated(" ", from);
1069    let predicates = separated(" AND ", predicates);
1070    let order_by = separated(", ", order_by);
1071    let query = format!(
1072        r#"{ctes}
1073SELECT {columns}
1074FROM {from}
1075WHERE {predicates}
1076ORDER BY {order_by}"#
1077    );
1078
1079    if statement.as_sql {
1080        let rows = vec![Row::pack_slice(&[Datum::String(
1081            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1082                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1083            })?,
1084        )])];
1085        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1086
1087        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1088    } else {
1089        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1090        show_select.plan()
1091    }
1092}
1093
1094pub fn plan_explain_analyze_cluster(
1095    scx: &StatementContext,
1096    statement: ExplainAnalyzeClusterStatement,
1097    _params: &Params,
1098) -> Result<Plan, PlanError> {
1099    // object string
1100    // worker_id uint64        (if           skew)
1101    // memory_ratio numeric    (if memory && skew)
1102    // worker_memory string    (if memory && skew)
1103    // avg_memory string       (if memory && skew)
1104    // total_memory string     (if memory)
1105    // records_ratio numeric   (if memory && skew)
1106    // worker_records          (if memory && skew)
1107    // avg_records numeric     (if memory && skew)
1108    // total_records numeric   (if memory)
1109    // cpu_ratio numeric       (if cpu    && skew)
1110    // worker_elapsed interval (if cpu    && skew)
1111    // avg_elapsed interval    (if cpu    && skew)
1112    // total_elapsed interval  (if cpu)
1113
1114    /* WITH {CTEs}
1115       SELECT mo.name AS object
1116             {columns}
1117        FROM mz_introspection.mz_mappable_objects mo
1118             {from}
1119       WHERE {predicates}
1120       ORDER BY {order_by}, mo.name DESC
1121    */
1122    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1123    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1124    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1125    let mut predicates = vec![];
1126    let mut order_by = vec![];
1127
1128    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1129    let mut worker_id = None;
1130    let mut seen_properties = BTreeSet::new();
1131    for property in properties {
1132        // handle each property only once (belt and suspenders)
1133        if !seen_properties.insert(property) {
1134            continue;
1135        }
1136
1137        match property {
1138            ExplainAnalyzeComputationProperty::Memory => {
1139                if skew {
1140                    let mut set_worker_id = false;
1141                    if let Some(worker_id) = worker_id {
1142                        // join condition if we're showing skew for more than one property
1143                        predicates.push(format!("om.worker_id = {worker_id}"));
1144                    } else {
1145                        worker_id = Some("om.worker_id");
1146                        columns.push("om.worker_id AS worker_id");
1147                        set_worker_id = true; // we'll add ourselves to `order_by` later
1148                    };
1149
1150                    // computes the average memory per LIR operator (for per operator ratios)
1151                    ctes.push((
1152                    "per_operator_memory_summary",
1153                    r#"
1154SELECT mlm.global_id AS global_id,
1155       mlm.lir_id AS lir_id,
1156       SUM(mas.size) AS total_memory,
1157       SUM(mas.records) AS total_records,
1158       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1159       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1160FROM        mz_introspection.mz_lir_mapping mlm
1161 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1162       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1163         ON (mas.operator_id = valid_id)
1164GROUP BY mlm.global_id, mlm.lir_id"#,
1165                ));
1166
1167                    // computes the memory per worker in a per operator way
1168                    ctes.push((
1169                    "per_operator_memory_per_worker",
1170                    r#"
1171SELECT mlm.global_id AS global_id,
1172       mlm.lir_id AS lir_id,
1173       mas.worker_id AS worker_id,
1174       SUM(mas.size) AS worker_memory,
1175       SUM(mas.records) AS worker_records
1176FROM        mz_introspection.mz_lir_mapping mlm
1177 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1178       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1179         ON (mas.operator_id = valid_id)
1180GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1181                    ));
1182
1183                    // computes memory ratios per worker per operator
1184                    ctes.push((
1185                    "per_operator_memory_ratios",
1186                    r#"
1187SELECT pompw.global_id AS global_id,
1188       pompw.lir_id AS lir_id,
1189       pompw.worker_id AS worker_id,
1190       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1191       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1192  FROM      per_operator_memory_per_worker pompw
1193       JOIN per_operator_memory_summary poms
1194         USING (global_id, lir_id)
1195"#,
1196                    ));
1197
1198                    // summarizes each object, per worker
1199                    ctes.push((
1200                        "object_memory",
1201                        r#"
1202SELECT pompw.global_id AS global_id,
1203       pompw.worker_id AS worker_id,
1204       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1205       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1206       SUM(pompw.worker_memory) AS worker_memory,
1207       SUM(pompw.worker_records) AS worker_records
1208FROM        per_operator_memory_per_worker pompw
1209     JOIN   per_operator_memory_ratios pomr
1210     USING (global_id, worker_id, lir_id)
1211GROUP BY pompw.global_id, pompw.worker_id
1212"#,
1213                    ));
1214
1215                    // summarizes each worker
1216                    ctes.push(("object_average_memory", r#"
1217SELECT om.global_id AS global_id,
1218       SUM(om.worker_memory) AS total_memory,
1219       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1220       SUM(om.worker_records) AS total_records,
1221       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1222  FROM object_memory om
1223GROUP BY om.global_id"#));
1224
1225                    from.push("LEFT JOIN object_memory om USING (global_id)");
1226                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1227
1228                    columns.extend([
1229                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1230                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1231                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1232                        "pg_size_pretty(oam.total_memory) AS total_memory",
1233                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1234                        "om.worker_records AS worker_records",
1235                        "oam.avg_records AS avg_records",
1236                        "oam.total_records AS total_records",
1237                    ]);
1238
1239                    order_by.extend([
1240                        "max_operator_memory_ratio DESC",
1241                        "max_operator_records_ratio DESC",
1242                        "worker_memory DESC",
1243                        "worker_records DESC",
1244                    ]);
1245
1246                    if set_worker_id {
1247                        order_by.push("worker_id");
1248                    }
1249                } else {
1250                    // no skew, so just compute totals
1251                    ctes.push((
1252                        "per_operator_memory_totals",
1253                        r#"
1254    SELECT mlm.global_id AS global_id,
1255           mlm.lir_id AS lir_id,
1256           SUM(mas.size) AS total_memory,
1257           SUM(mas.records) AS total_records
1258    FROM        mz_introspection.mz_lir_mapping mlm
1259     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1260           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1261             ON (mas.operator_id = valid_id)
1262    GROUP BY mlm.global_id, mlm.lir_id"#,
1263                    ));
1264
1265                    ctes.push((
1266                        "object_memory_totals",
1267                        r#"
1268SELECT pomt.global_id AS global_id,
1269       SUM(pomt.total_memory) AS total_memory,
1270       SUM(pomt.total_records) AS total_records
1271FROM per_operator_memory_totals pomt
1272GROUP BY pomt.global_id
1273"#,
1274                    ));
1275
1276                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1277                    columns.extend([
1278                        "pg_size_pretty(omt.total_memory) AS total_memory",
1279                        "omt.total_records AS total_records",
1280                    ]);
1281                    order_by.extend(["total_memory DESC", "total_records DESC"]);
1282                }
1283            }
1284            ExplainAnalyzeComputationProperty::Cpu => {
1285                if skew {
1286                    let mut set_worker_id = false;
1287                    if let Some(worker_id) = worker_id {
1288                        // join condition if we're showing skew for more than one property
1289                        predicates.push(format!("oc.worker_id = {worker_id}"));
1290                    } else {
1291                        worker_id = Some("oc.worker_id");
1292                        columns.push("oc.worker_id AS worker_id");
1293                        set_worker_id = true; // we'll add ourselves to `order_by` later
1294                    };
1295
1296                    // computes the average memory per LIR operator (for per operator ratios)
1297                    ctes.push((
1298    "per_operator_cpu_summary",
1299    r#"
1300SELECT mlm.global_id AS global_id,
1301       mlm.lir_id AS lir_id,
1302       SUM(mse.elapsed_ns) AS total_ns,
1303       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1304FROM       mz_introspection.mz_lir_mapping mlm
1305CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1306      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1307        ON (mse.id = valid_id)
1308GROUP BY mlm.global_id, mlm.lir_id"#,
1309));
1310
1311                    // computes the CPU per worker in a per operator way
1312                    ctes.push((
1313                        "per_operator_cpu_per_worker",
1314                        r#"
1315SELECT mlm.global_id AS global_id,
1316       mlm.lir_id AS lir_id,
1317       mse.worker_id AS worker_id,
1318       SUM(mse.elapsed_ns) AS worker_ns
1319FROM       mz_introspection.mz_lir_mapping mlm
1320CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1321      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1322        ON (mse.id = valid_id)
1323GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1324                    ));
1325
1326                    // computes CPU ratios per worker per operator
1327                    ctes.push((
1328                        "per_operator_cpu_ratios",
1329                        r#"
1330SELECT pocpw.global_id AS global_id,
1331       pocpw.lir_id AS lir_id,
1332       pocpw.worker_id AS worker_id,
1333       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1334FROM      per_operator_cpu_per_worker pocpw
1335     JOIN per_operator_cpu_summary pocs
1336     USING (global_id, lir_id)
1337"#,
1338                    ));
1339
1340                    // summarizes each object, per worker
1341                    ctes.push((
1342                        "object_cpu",
1343                        r#"
1344SELECT pocpw.global_id AS global_id,
1345       pocpw.worker_id AS worker_id,
1346       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1347       SUM(pocpw.worker_ns) AS worker_ns
1348FROM      per_operator_cpu_per_worker pocpw
1349     JOIN per_operator_cpu_ratios pomr
1350     USING (global_id, worker_id, lir_id)
1351GROUP BY pocpw.global_id, pocpw.worker_id
1352"#,
1353                    ));
1354
1355                    // summarizes each worker
1356                    ctes.push((
1357                        "object_average_cpu",
1358                        r#"
1359SELECT oc.global_id AS global_id,
1360       SUM(oc.worker_ns) AS total_ns,
1361       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1362  FROM object_cpu oc
1363GROUP BY oc.global_id"#,));
1364
1365                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1366                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1367
1368                    columns.extend([
1369                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1370                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1371                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1372                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1373                    ]);
1374
1375                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1376
1377                    if set_worker_id {
1378                        order_by.push("worker_id");
1379                    }
1380                } else {
1381                    // no skew, so just compute totals
1382                    ctes.push((
1383                        "per_operator_cpu_totals",
1384                        r#"
1385    SELECT mlm.global_id AS global_id,
1386           mlm.lir_id AS lir_id,
1387           SUM(mse.elapsed_ns) AS total_ns
1388    FROM        mz_introspection.mz_lir_mapping mlm
1389     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1390           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1391             ON (mse.id = valid_id)
1392    GROUP BY mlm.global_id, mlm.lir_id"#,
1393                    ));
1394
1395                    ctes.push((
1396                        "object_cpu_totals",
1397                        r#"
1398SELECT poct.global_id AS global_id,
1399       SUM(poct.total_ns) AS total_ns
1400FROM per_operator_cpu_totals poct
1401GROUP BY poct.global_id
1402"#,
1403                    ));
1404
1405                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1406                    columns
1407                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1408                    order_by.extend(["total_elapsed DESC"]);
1409                }
1410            }
1411        }
1412    }
1413
1414    // generate SQL query text
1415    let ctes = if !ctes.is_empty() {
1416        format!(
1417            "WITH {}",
1418            separated(
1419                ",\n",
1420                ctes.iter()
1421                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1422            )
1423        )
1424    } else {
1425        String::new()
1426    };
1427    let columns = separated(", ", columns);
1428    let from = separated(" ", from);
1429    let predicates = if !predicates.is_empty() {
1430        format!("WHERE {}", separated(" AND ", predicates))
1431    } else {
1432        String::new()
1433    };
1434    // add mo.name last, to break ties only
1435    order_by.push("mo.name DESC");
1436    let order_by = separated(", ", order_by);
1437    let query = format!(
1438        r#"{ctes}
1439SELECT {columns}
1440FROM {from}
1441{predicates}
1442ORDER BY {order_by}"#
1443    );
1444
1445    if statement.as_sql {
1446        let rows = vec![Row::pack_slice(&[Datum::String(
1447            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1448                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1449            })?,
1450        )])];
1451        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1452
1453        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1454    } else {
1455        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1456        show_select.plan()
1457    }
1458}
1459
1460pub fn plan_explain_timestamp(
1461    scx: &StatementContext,
1462    explain: ExplainTimestampStatement<Aug>,
1463) -> Result<Plan, PlanError> {
1464    let (format, _verbose_syntax) = match explain.format() {
1465        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1466        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1467        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1468        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1469    };
1470
1471    let raw_plan = {
1472        let query::PlannedRootQuery {
1473            expr: raw_plan,
1474            desc: _,
1475            finishing: _,
1476            scope: _,
1477        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1478        if raw_plan.contains_parameters()? {
1479            return Err(PlanError::ParameterNotAllowed(
1480                "EXPLAIN TIMESTAMP".to_string(),
1481            ));
1482        }
1483
1484        raw_plan
1485    };
1486    let when = query::plan_as_of(scx, explain.select.as_of)?;
1487
1488    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1489        format,
1490        raw_plan,
1491        when,
1492    }))
1493}
1494
1495/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1496/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1497#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1498pub fn plan_query(
1499    scx: &StatementContext,
1500    query: Query<Aug>,
1501    params: &Params,
1502    lifetime: QueryLifetime,
1503) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1504    let query::PlannedRootQuery {
1505        mut expr,
1506        desc,
1507        finishing,
1508        scope,
1509    } = query::plan_root_query(scx, query, lifetime)?;
1510    expr.bind_parameters(scx, lifetime, params)?;
1511
1512    Ok(query::PlannedRootQuery {
1513        // No metrics passed! One more reason not to use this deprecated function.
1514        expr: expr.lower(scx.catalog.system_vars(), None)?,
1515        desc,
1516        finishing,
1517        scope,
1518    })
1519}
1520
1521generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1522
1523pub fn describe_subscribe(
1524    scx: &StatementContext,
1525    stmt: SubscribeStatement<Aug>,
1526) -> Result<StatementDesc, PlanError> {
1527    let relation_desc = match stmt.relation {
1528        SubscribeRelation::Name(name) => {
1529            let item = scx.get_item_by_resolved_name(&name)?;
1530            item.desc(&scx.catalog.resolve_full_name(item.name()))?
1531                .into_owned()
1532        }
1533        SubscribeRelation::Query(query) => {
1534            let query::PlannedRootQuery { desc, .. } =
1535                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1536            desc
1537        }
1538    };
1539    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1540    let progress = progress.unwrap_or(false);
1541    let mut desc = RelationDesc::builder().with_column(
1542        "mz_timestamp",
1543        SqlScalarType::Numeric {
1544            max_scale: Some(NumericMaxScale::ZERO),
1545        }
1546        .nullable(false),
1547    );
1548    if progress {
1549        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1550    }
1551
1552    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1553    match stmt.output {
1554        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1555            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1556            for (name, mut ty) in relation_desc.into_iter() {
1557                if progress {
1558                    ty.nullable = true;
1559                }
1560                desc = desc.with_column(name, ty);
1561            }
1562        }
1563        SubscribeOutput::EnvelopeUpsert { key_columns }
1564        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1565            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1566            let key_columns = key_columns
1567                .into_iter()
1568                .map(normalize::column_name)
1569                .collect_vec();
1570            let mut before_values_desc = RelationDesc::builder();
1571            let mut after_values_desc = RelationDesc::builder();
1572
1573            // Add the key columns in the order that they're specified.
1574            for column_name in &key_columns {
1575                let mut column_ty = relation_desc
1576                    .get_by_name(column_name)
1577                    .map(|(_pos, ty)| ty.clone())
1578                    .ok_or_else(|| PlanError::UnknownColumn {
1579                        table: None,
1580                        column: column_name.clone(),
1581                        similar: Box::new([]),
1582                    })?;
1583                if progress {
1584                    column_ty.nullable = true;
1585                }
1586                desc = desc.with_column(column_name, column_ty);
1587            }
1588
1589            // Then add the remaining columns in the order from the original
1590            // table, filtering out the key columns since we added those above.
1591            for (mut name, mut ty) in relation_desc
1592                .into_iter()
1593                .filter(|(name, _ty)| !key_columns.contains(name))
1594            {
1595                ty.nullable = true;
1596                before_values_desc =
1597                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1598                if debezium {
1599                    name = format!("after_{}", name).into();
1600                }
1601                after_values_desc = after_values_desc.with_column(name, ty);
1602            }
1603
1604            if debezium {
1605                desc = desc.concat(before_values_desc);
1606            }
1607            desc = desc.concat(after_values_desc);
1608        }
1609    }
1610    Ok(StatementDesc::new(Some(desc.finish())))
1611}
1612
1613pub fn plan_subscribe(
1614    scx: &StatementContext,
1615    SubscribeStatement {
1616        relation,
1617        options,
1618        as_of,
1619        up_to,
1620        output,
1621    }: SubscribeStatement<Aug>,
1622    params: &Params,
1623    copy_to: Option<CopyFormat>,
1624) -> Result<Plan, PlanError> {
1625    let (from, desc, scope) = match relation {
1626        SubscribeRelation::Name(name) => {
1627            let entry = scx.get_item_by_resolved_name(&name)?;
1628            let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1629                Ok(desc) => desc,
1630                Err(..) => sql_bail!(
1631                    "'{}' cannot be subscribed to because it is a {}",
1632                    name.full_name_str(),
1633                    entry.item_type(),
1634                ),
1635            };
1636            let item_name = match name {
1637                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1638                _ => None,
1639            };
1640            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1641            (
1642                SubscribeFrom::Id(entry.global_id()),
1643                desc.into_owned(),
1644                scope,
1645            )
1646        }
1647        SubscribeRelation::Query(query) => {
1648            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1649            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1650            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1651            // finishing should have already been turned into a `TopK` by
1652            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1653            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1654                &query.finishing,
1655                query.desc.arity()
1656            ));
1657            let desc = query.desc.clone();
1658            (
1659                SubscribeFrom::Query {
1660                    expr: query.expr,
1661                    desc: query.desc,
1662                },
1663                desc,
1664                query.scope,
1665            )
1666        }
1667    };
1668
1669    let when = query::plan_as_of(scx, as_of)?;
1670    let up_to = up_to
1671        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1672        .transpose()?;
1673
1674    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1675    let ecx = ExprContext {
1676        qcx: &qcx,
1677        name: "",
1678        scope: &scope,
1679        relation_type: desc.typ(),
1680        allow_aggregates: false,
1681        allow_subqueries: true,
1682        allow_parameters: true,
1683        allow_windows: false,
1684    };
1685
1686    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1687    let output = match output {
1688        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1689        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1690            let order_by = key_columns
1691                .iter()
1692                .map(|ident| OrderByExpr {
1693                    expr: Expr::Identifier(vec![ident.clone()]),
1694                    asc: None,
1695                    nulls_last: None,
1696                })
1697                .collect_vec();
1698            let (order_by, map_exprs) = query::plan_order_by_exprs(
1699                &ExprContext {
1700                    name: "ENVELOPE UPSERT KEY clause",
1701                    ..ecx
1702                },
1703                &order_by[..],
1704                &output_columns[..],
1705            )?;
1706            if !map_exprs.is_empty() {
1707                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1708            }
1709            plan::SubscribeOutput::EnvelopeUpsert {
1710                order_by_keys: order_by,
1711            }
1712        }
1713        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1714            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1715            let order_by = key_columns
1716                .iter()
1717                .map(|ident| OrderByExpr {
1718                    expr: Expr::Identifier(vec![ident.clone()]),
1719                    asc: None,
1720                    nulls_last: None,
1721                })
1722                .collect_vec();
1723            let (order_by, map_exprs) = query::plan_order_by_exprs(
1724                &ExprContext {
1725                    name: "ENVELOPE DEBEZIUM KEY clause",
1726                    ..ecx
1727                },
1728                &order_by[..],
1729                &output_columns[..],
1730            )?;
1731            if !map_exprs.is_empty() {
1732                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1733            }
1734            plan::SubscribeOutput::EnvelopeDebezium {
1735                order_by_keys: order_by,
1736            }
1737        }
1738        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1739            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1740            let mz_diff = "mz_diff".into();
1741            let output_columns = std::iter::once((0, &mz_diff))
1742                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1743                .collect_vec();
1744            match query::plan_order_by_exprs(
1745                &ExprContext {
1746                    name: "WITHIN TIMESTAMP ORDER BY clause",
1747                    ..ecx
1748                },
1749                &order_by[..],
1750                &output_columns[..],
1751            ) {
1752                Err(PlanError::UnknownColumn {
1753                    table: None,
1754                    column,
1755                    similar: _,
1756                }) if &column == &mz_diff => {
1757                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1758                    // it looks like an unknown column. Instead, return a better error
1759                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1760                }
1761                Err(e) => return Err(e),
1762                Ok((order_by, map_exprs)) => {
1763                    if !map_exprs.is_empty() {
1764                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1765                    }
1766
1767                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1768                }
1769            }
1770        }
1771    };
1772
1773    let SubscribeOptionExtracted {
1774        progress, snapshot, ..
1775    } = options.try_into()?;
1776    Ok(Plan::Subscribe(SubscribePlan {
1777        from,
1778        when,
1779        up_to,
1780        with_snapshot: snapshot.unwrap_or(true),
1781        copy_to,
1782        emit_progress: progress.unwrap_or(false),
1783        output,
1784    }))
1785}
1786
1787pub fn describe_copy_from_table(
1788    scx: &StatementContext,
1789    table_name: <Aug as AstInfo>::ItemName,
1790    columns: Vec<Ident>,
1791) -> Result<StatementDesc, PlanError> {
1792    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1793    Ok(StatementDesc::new(Some(desc)))
1794}
1795
1796pub fn describe_copy_item(
1797    scx: &StatementContext,
1798    object_name: <Aug as AstInfo>::ItemName,
1799    columns: Vec<Ident>,
1800) -> Result<StatementDesc, PlanError> {
1801    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1802    Ok(StatementDesc::new(Some(desc)))
1803}
1804
1805pub fn describe_copy(
1806    scx: &StatementContext,
1807    CopyStatement {
1808        relation,
1809        direction,
1810        ..
1811    }: CopyStatement<Aug>,
1812) -> Result<StatementDesc, PlanError> {
1813    Ok(match (relation, direction) {
1814        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1815            describe_copy_item(scx, name, columns)?
1816        }
1817        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1818            describe_copy_from_table(scx, name, columns)?
1819        }
1820        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1821        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1822    }
1823    .with_is_copy())
1824}
1825
1826fn plan_copy_to_expr(
1827    scx: &StatementContext,
1828    select_plan: SelectPlan,
1829    desc: RelationDesc,
1830    to: &Expr<Aug>,
1831    format: CopyFormat,
1832    options: CopyOptionExtracted,
1833) -> Result<Plan, PlanError> {
1834    let conn_id = match options.aws_connection {
1835        Some(conn_id) => CatalogItemId::from(conn_id),
1836        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1837    };
1838    let connection = scx.get_item(&conn_id).connection()?;
1839
1840    match connection {
1841        mz_storage_types::connections::Connection::Aws(_) => {}
1842        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1843    }
1844
1845    let format = match format {
1846        CopyFormat::Csv => {
1847            let quote = extract_byte_param_value(options.quote, "quote")?;
1848            let escape = extract_byte_param_value(options.escape, "escape")?;
1849            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1850            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1851                CopyCsvFormatParams::try_new(
1852                    delimiter,
1853                    quote,
1854                    escape,
1855                    options.header,
1856                    options.null,
1857                )
1858                .map_err(|e| sql_err!("{}", e))?,
1859            ))
1860        }
1861        CopyFormat::Parquet => {
1862            // Validate that the output desc can be formatted as parquet
1863            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1864            S3SinkFormat::Parquet
1865        }
1866        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1867        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1868    };
1869
1870    // Converting the to expr to a HirScalarExpr
1871    let mut to_expr = to.clone();
1872    transform_ast::transform(scx, &mut to_expr)?;
1873    let relation_type = RelationDesc::empty();
1874    let ecx = &ExprContext {
1875        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1876        name: "COPY TO target",
1877        scope: &Scope::empty(),
1878        relation_type: relation_type.typ(),
1879        allow_aggregates: false,
1880        allow_subqueries: false,
1881        allow_parameters: false,
1882        allow_windows: false,
1883    };
1884
1885    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1886
1887    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1888        sql_bail!(
1889            "MAX FILE SIZE cannot be less than {}",
1890            MIN_S3_SINK_FILE_SIZE
1891        );
1892    }
1893    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1894        sql_bail!(
1895            "MAX FILE SIZE cannot be greater than {}",
1896            MAX_S3_SINK_FILE_SIZE
1897        );
1898    }
1899
1900    Ok(Plan::CopyTo(CopyToPlan {
1901        select_plan,
1902        desc,
1903        to,
1904        connection: connection.to_owned(),
1905        connection_id: conn_id,
1906        format,
1907        max_file_size: options.max_file_size.as_bytes(),
1908    }))
1909}
1910
1911fn plan_copy_from(
1912    scx: &StatementContext,
1913    target: &CopyTarget<Aug>,
1914    table_name: ResolvedItemName,
1915    columns: Vec<Ident>,
1916    format: CopyFormat,
1917    options: CopyOptionExtracted,
1918) -> Result<Plan, PlanError> {
1919    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1920        match option {
1921            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1922            None => Ok(()),
1923        }
1924    }
1925
1926    let source = match target {
1927        CopyTarget::Stdin => CopyFromSource::Stdin,
1928        CopyTarget::Expr(from) => {
1929            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1930
1931            // Converting the expr to an HirScalarExpr
1932            let mut from_expr = from.clone();
1933            transform_ast::transform(scx, &mut from_expr)?;
1934            let relation_type = RelationDesc::empty();
1935            let ecx = &ExprContext {
1936                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1937                name: "COPY FROM target",
1938                scope: &Scope::empty(),
1939                relation_type: relation_type.typ(),
1940                allow_aggregates: false,
1941                allow_subqueries: false,
1942                allow_parameters: false,
1943                allow_windows: false,
1944            };
1945            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1946
1947            match options.aws_connection {
1948                Some(conn_id) => {
1949                    let conn_id = CatalogItemId::from(conn_id);
1950
1951                    // Validate the connection type is one we expect.
1952                    let connection = match scx.get_item(&conn_id).connection()? {
1953                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1954                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1955                    };
1956
1957                    CopyFromSource::AwsS3 {
1958                        uri: from,
1959                        connection,
1960                        connection_id: conn_id,
1961                    }
1962                }
1963                None => CopyFromSource::Url(from),
1964            }
1965        }
1966        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1967    };
1968
1969    let params = match format {
1970        CopyFormat::Text => {
1971            only_available_with_csv(options.quote, "quote")?;
1972            only_available_with_csv(options.escape, "escape")?;
1973            only_available_with_csv(options.header, "HEADER")?;
1974            let delimiter =
1975                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1976            let null = match options.null {
1977                Some(null) => Cow::from(null),
1978                None => Cow::from("\\N"),
1979            };
1980            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1981        }
1982        CopyFormat::Csv => {
1983            let quote = extract_byte_param_value(options.quote, "quote")?;
1984            let escape = extract_byte_param_value(options.escape, "escape")?;
1985            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1986            CopyFormatParams::Csv(
1987                CopyCsvFormatParams::try_new(
1988                    delimiter,
1989                    quote,
1990                    escape,
1991                    options.header,
1992                    options.null,
1993                )
1994                .map_err(|e| sql_err!("{}", e))?,
1995            )
1996        }
1997        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1998        CopyFormat::Parquet => CopyFormatParams::Parquet,
1999    };
2000
2001    let filter = match (options.files, options.pattern) {
2002        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2003        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2004        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2005        (None, None) => None,
2006    };
2007
2008    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2009        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2010    }
2011
2012    let table_name_string = table_name.full_name_str();
2013
2014    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2015
2016    let Some(mfp) = maybe_mfp else {
2017        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2018    };
2019
2020    Ok(Plan::CopyFrom(CopyFromPlan {
2021        target_id: id,
2022        target_name: table_name_string,
2023        source,
2024        columns,
2025        source_desc,
2026        mfp,
2027        params,
2028        filter,
2029    }))
2030}
2031
2032fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2033    match v {
2034        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2035        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2036        None => Ok(None),
2037    }
2038}
2039
2040generate_extracted_config!(
2041    CopyOption,
2042    (Format, String),
2043    (Delimiter, String),
2044    (Null, String),
2045    (Escape, String),
2046    (Quote, String),
2047    (Header, bool),
2048    (AwsConnection, with_options::Object),
2049    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2050    (Files, Vec<String>),
2051    (Pattern, String)
2052);
2053
2054pub fn plan_copy(
2055    scx: &StatementContext,
2056    CopyStatement {
2057        relation,
2058        direction,
2059        target,
2060        options,
2061    }: CopyStatement<Aug>,
2062) -> Result<Plan, PlanError> {
2063    let options = CopyOptionExtracted::try_from(options)?;
2064    // Parse any user-provided FORMAT option. If not provided, will default to
2065    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2066    let format = options
2067        .format
2068        .as_ref()
2069        .map(|format| match format.to_lowercase().as_str() {
2070            "text" => Ok(CopyFormat::Text),
2071            "csv" => Ok(CopyFormat::Csv),
2072            "binary" => Ok(CopyFormat::Binary),
2073            "parquet" => Ok(CopyFormat::Parquet),
2074            _ => sql_bail!("unknown FORMAT: {}", format),
2075        })
2076        .transpose()?;
2077
2078    match (&direction, &target) {
2079        (CopyDirection::To, CopyTarget::Stdout) => {
2080            if options.delimiter.is_some() {
2081                sql_bail!("COPY TO does not support DELIMITER option yet");
2082            }
2083            if options.quote.is_some() {
2084                sql_bail!("COPY TO does not support QUOTE option yet");
2085            }
2086            if options.null.is_some() {
2087                sql_bail!("COPY TO does not support NULL option yet");
2088            }
2089            match relation {
2090                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2091                CopyRelation::Select(stmt) => Ok(plan_select(
2092                    scx,
2093                    stmt,
2094                    &Params::empty(),
2095                    Some(format.unwrap_or(CopyFormat::Text)),
2096                )?),
2097                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2098                    scx,
2099                    stmt,
2100                    &Params::empty(),
2101                    Some(format.unwrap_or(CopyFormat::Text)),
2102                )?),
2103            }
2104        }
2105        (CopyDirection::From, target) => match relation {
2106            CopyRelation::Named { name, columns } => plan_copy_from(
2107                scx,
2108                target,
2109                name,
2110                columns,
2111                format.unwrap_or(CopyFormat::Text),
2112                options,
2113            ),
2114            _ => sql_bail!("COPY FROM {} not supported", target),
2115        },
2116        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2117            // System users are always allowed to use this feature, even when
2118            // the flag is disabled, so that we can dogfood for analytics in
2119            // production environments. The feature is stable enough that we're
2120            // not worried about it crashing.
2121            if !scx.catalog.active_role_id().is_system() {
2122                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2123            }
2124
2125            let format = match format {
2126                Some(inner) => inner,
2127                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2128            };
2129
2130            let stmt = match relation {
2131                CopyRelation::Named { name, columns } => {
2132                    if !columns.is_empty() {
2133                        // TODO(mouli): Add support for this
2134                        sql_bail!(
2135                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2136                        );
2137                    }
2138                    // Generate a synthetic SELECT query that just gets the table
2139                    let query = Query {
2140                        ctes: CteBlock::empty(),
2141                        body: SetExpr::Table(name),
2142                        order_by: vec![],
2143                        limit: None,
2144                        offset: None,
2145                    };
2146                    SelectStatement { query, as_of: None }
2147                }
2148                CopyRelation::Select(stmt) => {
2149                    if !stmt.query.order_by.is_empty() {
2150                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2151                    }
2152                    stmt
2153                }
2154                _ => sql_bail!("COPY {} {} not supported", direction, target),
2155            };
2156
2157            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2158            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2159        }
2160        _ => sql_bail!("COPY {} {} not supported", direction, target),
2161    }
2162}