Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_expr::visit::Visit;
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_ore::str::separated;
25use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
26use mz_repr::adt::numeric::NumericMaxScale;
27use mz_repr::bytes::ByteSize;
28use mz_repr::explain::{ExplainConfig, ExplainFormat};
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
31use mz_sql_parser::ast::{
32    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
33    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
34    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36    SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49    UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{
70    self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
71};
72
73// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
74// plans the whole query to determine its shape and parameter types,
75// and then throws away that plan. If we were smarter, we'd stash that
76// plan somewhere so we don't have to recompute it when the query is
77// executed.
78
79pub fn describe_insert(
80    scx: &StatementContext,
81    InsertStatement {
82        table_name,
83        columns,
84        source,
85        returning,
86    }: InsertStatement<Aug>,
87) -> Result<StatementDesc, PlanError> {
88    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
89    let desc = if returning.expr.is_empty() {
90        None
91    } else {
92        Some(returning.desc)
93    };
94    Ok(StatementDesc::new(desc))
95}
96
97pub fn plan_insert(
98    scx: &StatementContext,
99    InsertStatement {
100        table_name,
101        columns,
102        source,
103        returning,
104    }: InsertStatement<Aug>,
105    params: &Params,
106) -> Result<Plan, PlanError> {
107    let (id, mut expr, returning) =
108        query::plan_insert_query(scx, table_name, columns, source, returning)?;
109    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
110    let returning = returning
111        .expr
112        .into_iter()
113        .map(|mut expr| {
114            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
115            expr.lower_uncorrelated(scx.catalog.system_vars())
116        })
117        .collect::<Result<Vec<_>, _>>()?;
118
119    Ok(Plan::Insert(InsertPlan {
120        id,
121        values: expr,
122        returning,
123    }))
124}
125
126pub fn describe_delete(
127    scx: &StatementContext,
128    stmt: DeleteStatement<Aug>,
129) -> Result<StatementDesc, PlanError> {
130    query::plan_delete_query(scx, stmt)?;
131    Ok(StatementDesc::new(None))
132}
133
134pub fn plan_delete(
135    scx: &StatementContext,
136    stmt: DeleteStatement<Aug>,
137    params: &Params,
138) -> Result<Plan, PlanError> {
139    let rtw_plan = query::plan_delete_query(scx, stmt)?;
140    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
141}
142
143pub fn describe_update(
144    scx: &StatementContext,
145    stmt: UpdateStatement<Aug>,
146) -> Result<StatementDesc, PlanError> {
147    query::plan_update_query(scx, stmt)?;
148    Ok(StatementDesc::new(None))
149}
150
151pub fn plan_update(
152    scx: &StatementContext,
153    stmt: UpdateStatement<Aug>,
154    params: &Params,
155) -> Result<Plan, PlanError> {
156    let rtw_plan = query::plan_update_query(scx, stmt)?;
157    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
158}
159
160pub fn plan_read_then_write(
161    scx: &StatementContext,
162    kind: MutationKind,
163    params: &Params,
164    query::ReadThenWritePlan {
165        id,
166        mut selection,
167        finishing,
168        assignments,
169    }: query::ReadThenWritePlan,
170) -> Result<Plan, PlanError> {
171    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
172    let mut assignments_outer = BTreeMap::new();
173    for (idx, mut set) in assignments {
174        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
175        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
176        assignments_outer.insert(idx, set);
177    }
178
179    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
180        id,
181        selection,
182        finishing,
183        assignments: assignments_outer,
184        kind,
185        returning: Vec::new(),
186    }))
187}
188
189pub fn describe_select(
190    scx: &StatementContext,
191    stmt: SelectStatement<Aug>,
192) -> Result<StatementDesc, PlanError> {
193    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
194        return Ok(StatementDesc::new(Some(desc)));
195    }
196
197    let query::PlannedRootQuery { desc, .. } =
198        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
199    Ok(StatementDesc::new(Some(desc)))
200}
201
202pub fn plan_select(
203    scx: &StatementContext,
204    select: SelectStatement<Aug>,
205    params: &Params,
206    copy_to: Option<CopyFormat>,
207) -> Result<Plan, PlanError> {
208    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
209        return Ok(Plan::SideEffectingFunc(f));
210    }
211
212    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
213    Ok(Plan::Select(plan))
214}
215
216fn plan_select_inner(
217    scx: &StatementContext,
218    select: SelectStatement<Aug>,
219    params: &Params,
220    copy_to: Option<CopyFormat>,
221) -> Result<(SelectPlan, RelationDesc), PlanError> {
222    let when = query::plan_as_of(scx, select.as_of.clone())?;
223    let lifetime = QueryLifetime::OneShot;
224    let query::PlannedRootQuery {
225        mut expr,
226        desc,
227        finishing,
228        scope: _,
229    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
230    expr.bind_parameters(scx, lifetime, params)?;
231
232    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
233    // Let's check this and simplify them to literals.
234    expr.try_visit_mut_pre(&mut |expr| {
235        if let HirRelationExpr::TopK { offset, .. } = expr {
236            let offset_value = offset_into_value(offset.take())?;
237            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
238        }
239        Ok::<(), PlanError>(())
240    })?;
241    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
242    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
243    // so later.)
244
245    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
246    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
247    // This involves binding parameters and evaluating each expression to a number.
248    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
249    // so this `limit` has to be a constant.)
250    let limit = match finishing.limit {
251        None => None,
252        Some(mut limit) => {
253            limit.bind_parameters(scx, lifetime, params)?;
254            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
255            let Some(limit) = limit.as_literal() else {
256                sql_bail!(
257                    "Top-level LIMIT must be a constant expression, got {}",
258                    limit
259                )
260            };
261            match limit {
262                Datum::Null => None,
263                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
264                _ => {
265                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
266                    sql_bail!("LIMIT must be a non-negative INT or NULL")
267                }
268            }
269        }
270    };
271    let offset = {
272        let mut offset = finishing.offset.clone();
273        offset.bind_parameters(scx, lifetime, params)?;
274        let offset = offset_into_value(offset.take())?;
275        offset
276            .try_into()
277            .expect("checked in offset_into_value that it is not negative")
278    };
279
280    // Unmaterializable functions are evaluated based on various information (e.g., the catalog)
281    // during sequencing, without taking AS OF into account. This would be hard to fix, so for now
282    // we just disallow AS OF when there is an unmaterializable function in a query (except mz_now).
283    if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
284        && select.as_of.is_some()
285        && expr.contains_unmaterializable_except_temporal()?
286    {
287        bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
288    }
289
290    let plan = SelectPlan {
291        source: expr,
292        when,
293        finishing: RowSetFinishing {
294            limit,
295            offset,
296            project: finishing.project,
297            order_by: finishing.order_by,
298        },
299        copy_to,
300        select: Some(Box::new(select)),
301    };
302
303    Ok((plan, desc))
304}
305
306pub fn describe_explain_plan(
307    scx: &StatementContext,
308    explain: ExplainPlanStatement<Aug>,
309) -> Result<StatementDesc, PlanError> {
310    let mut relation_desc = RelationDesc::builder();
311
312    match explain.stage() {
313        ExplainStage::RawPlan => {
314            let name = "Raw Plan";
315            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316        }
317        ExplainStage::DecorrelatedPlan => {
318            let name = "Decorrelated Plan";
319            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320        }
321        ExplainStage::LocalPlan => {
322            let name = "Locally Optimized Plan";
323            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
324        }
325        ExplainStage::GlobalPlan => {
326            let name = "Optimized Plan";
327            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
328        }
329        ExplainStage::PhysicalPlan => {
330            let name = "Physical Plan";
331            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
332        }
333        ExplainStage::Trace => {
334            relation_desc = relation_desc
335                .with_column("Time", SqlScalarType::UInt64.nullable(false))
336                .with_column("Path", SqlScalarType::String.nullable(false))
337                .with_column("Plan", SqlScalarType::String.nullable(false));
338        }
339        ExplainStage::PlanInsights => {
340            let name = "Plan Insights";
341            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
342        }
343    };
344    let relation_desc = relation_desc.finish();
345
346    Ok(
347        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
348            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
349            _ => vec![],
350        }),
351    )
352}
353
354pub fn describe_explain_pushdown(
355    scx: &StatementContext,
356    statement: ExplainPushdownStatement<Aug>,
357) -> Result<StatementDesc, PlanError> {
358    let relation_desc = RelationDesc::builder()
359        .with_column("Source", SqlScalarType::String.nullable(false))
360        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
361        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
362        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
363        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
364        .finish();
365
366    Ok(
367        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
368            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
369            _ => vec![],
370        }),
371    )
372}
373
374pub fn describe_explain_analyze_object(
375    _scx: &StatementContext,
376    statement: ExplainAnalyzeObjectStatement<Aug>,
377) -> Result<StatementDesc, PlanError> {
378    if statement.as_sql {
379        let relation_desc = RelationDesc::builder()
380            .with_column("SQL", SqlScalarType::String.nullable(false))
381            .finish();
382        return Ok(StatementDesc::new(Some(relation_desc)));
383    }
384
385    match statement.properties {
386        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
387            properties,
388            skew,
389        }) => {
390            let mut relation_desc = RelationDesc::builder()
391                .with_column("operator", SqlScalarType::String.nullable(false));
392
393            if skew {
394                relation_desc =
395                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
396            }
397
398            let mut seen_properties = BTreeSet::new();
399            for property in properties {
400                // handle each property only once (belt and suspenders)
401                if !seen_properties.insert(property) {
402                    continue;
403                }
404
405                match property {
406                    ExplainAnalyzeComputationProperty::Memory if skew => {
407                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
408                        relation_desc = relation_desc
409                            .with_column("memory_ratio", numeric.clone())
410                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
411                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
412                            .with_column("total_memory", SqlScalarType::String.nullable(true))
413                            .with_column("records_ratio", numeric.clone())
414                            .with_column("worker_records", numeric.clone())
415                            .with_column("avg_records", numeric.clone())
416                            .with_column("total_records", numeric);
417                    }
418                    ExplainAnalyzeComputationProperty::Memory => {
419                        relation_desc = relation_desc
420                            .with_column("total_memory", SqlScalarType::String.nullable(true))
421                            .with_column(
422                                "total_records",
423                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
424                            );
425                    }
426                    ExplainAnalyzeComputationProperty::Cpu => {
427                        if skew {
428                            relation_desc = relation_desc
429                                .with_column(
430                                    "cpu_ratio",
431                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
432                                )
433                                .with_column(
434                                    "worker_elapsed",
435                                    SqlScalarType::Interval.nullable(true),
436                                )
437                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
438                        }
439                        relation_desc = relation_desc
440                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
441                    }
442                }
443            }
444
445            let relation_desc = relation_desc.finish();
446            Ok(StatementDesc::new(Some(relation_desc)))
447        }
448        ExplainAnalyzeProperty::Hints => {
449            let relation_desc = RelationDesc::builder()
450                .with_column("operator", SqlScalarType::String.nullable(true))
451                .with_column("levels", SqlScalarType::Int64.nullable(true))
452                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
453                .with_column("hint", SqlScalarType::Float64.nullable(true))
454                .with_column("savings", SqlScalarType::String.nullable(true))
455                .finish();
456            Ok(StatementDesc::new(Some(relation_desc)))
457        }
458    }
459}
460
461pub fn describe_explain_analyze_cluster(
462    _scx: &StatementContext,
463    statement: ExplainAnalyzeClusterStatement,
464) -> Result<StatementDesc, PlanError> {
465    if statement.as_sql {
466        let relation_desc = RelationDesc::builder()
467            .with_column("SQL", SqlScalarType::String.nullable(false))
468            .finish();
469        return Ok(StatementDesc::new(Some(relation_desc)));
470    }
471
472    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
473
474    let mut relation_desc = RelationDesc::builder()
475        .with_column("object", SqlScalarType::String.nullable(false))
476        .with_column("global_id", SqlScalarType::String.nullable(false));
477
478    if skew {
479        relation_desc =
480            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
481    }
482
483    let mut seen_properties = BTreeSet::new();
484    for property in properties {
485        // handle each property only once (belt and suspenders)
486        if !seen_properties.insert(property) {
487            continue;
488        }
489
490        match property {
491            ExplainAnalyzeComputationProperty::Memory if skew => {
492                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
493                relation_desc = relation_desc
494                    .with_column("max_operator_memory_ratio", numeric.clone())
495                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
496                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
497                    .with_column("total_memory", SqlScalarType::String.nullable(true))
498                    .with_column("max_operator_records_ratio", numeric.clone())
499                    .with_column("worker_records", numeric.clone())
500                    .with_column("avg_records", numeric.clone())
501                    .with_column("total_records", numeric);
502            }
503            ExplainAnalyzeComputationProperty::Memory => {
504                relation_desc = relation_desc
505                    .with_column("total_memory", SqlScalarType::String.nullable(true))
506                    .with_column(
507                        "total_records",
508                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
509                    );
510            }
511            ExplainAnalyzeComputationProperty::Cpu if skew => {
512                relation_desc = relation_desc
513                    .with_column(
514                        "max_operator_cpu_ratio",
515                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
516                    )
517                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
518                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
519                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
520            }
521            ExplainAnalyzeComputationProperty::Cpu => {
522                relation_desc = relation_desc
523                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
524            }
525        }
526    }
527
528    Ok(StatementDesc::new(Some(relation_desc.finish())))
529}
530
531pub fn describe_explain_timestamp(
532    scx: &StatementContext,
533    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
534) -> Result<StatementDesc, PlanError> {
535    let relation_desc = RelationDesc::builder()
536        .with_column("Timestamp", SqlScalarType::String.nullable(false))
537        .finish();
538
539    Ok(StatementDesc::new(Some(relation_desc))
540        .with_params(describe_select(scx, select)?.param_types))
541}
542
543pub fn describe_explain_schema(
544    _: &StatementContext,
545    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
546) -> Result<StatementDesc, PlanError> {
547    let relation_desc = RelationDesc::builder()
548        .with_column("Schema", SqlScalarType::String.nullable(false))
549        .finish();
550    Ok(StatementDesc::new(Some(relation_desc)))
551}
552
553// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
554// `bool`:
555// - When it's an override of a global feature flag, for example optimizer feature flags. In this
556//   case, we need not just false and true, but also None to say "take the value of the global
557//   flag".
558// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
559//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
560//   but otherwise we'd like to default to false.
561generate_extracted_config!(
562    ExplainPlanOption,
563    (Arity, Option<bool>, Default(None)),
564    (Cardinality, bool, Default(false)),
565    (ColumnNames, bool, Default(false)),
566    (FilterPushdown, Option<bool>, Default(None)),
567    (HumanizedExpressions, Option<bool>, Default(None)),
568    (JoinImplementations, bool, Default(false)),
569    (Keys, bool, Default(false)),
570    (LinearChains, bool, Default(false)),
571    (NoFastPath, bool, Default(false)),
572    (NonNegative, bool, Default(false)),
573    (NoNotices, bool, Default(false)),
574    (NodeIdentifiers, bool, Default(false)),
575    (Raw, bool, Default(false)),
576    (RawPlans, bool, Default(false)),
577    (RawSyntax, bool, Default(false)),
578    (Redacted, bool, Default(false)),
579    (SubtreeSize, bool, Default(false)),
580    (Timing, bool, Default(false)),
581    (Types, bool, Default(false)),
582    (Equivalences, bool, Default(false)),
583    (ReoptimizeImportedViews, Option<bool>, Default(None)),
584    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
585    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
586    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
587    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
588    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
589    (
590        EnableProjectionPushdownAfterRelationCse,
591        Option<bool>,
592        Default(None)
593    )
594);
595
596impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
597    type Error = PlanError;
598
599    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
600        // If `WITH(raw)` is specified, ensure that the config will be as
601        // representative for the original plan as possible.
602        if v.raw {
603            v.raw_plans = true;
604            v.raw_syntax = true;
605        }
606
607        // Certain config should default to be enabled in release builds running on
608        // staging or prod (where SOFT_ASSERTIONS are turned off).
609        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
610
611        Ok(ExplainConfig {
612            arity: v.arity.unwrap_or(enable_on_prod),
613            cardinality: v.cardinality,
614            column_names: v.column_names,
615            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
616            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
617            join_impls: v.join_implementations,
618            keys: v.keys,
619            linear_chains: !v.raw_plans && v.linear_chains,
620            no_fast_path: v.no_fast_path,
621            no_notices: v.no_notices,
622            node_ids: v.node_identifiers,
623            non_negative: v.non_negative,
624            raw_plans: v.raw_plans,
625            raw_syntax: v.raw_syntax,
626            verbose_syntax: false,
627            redacted: v.redacted,
628            subtree_size: v.subtree_size,
629            equivalences: v.equivalences,
630            timing: v.timing,
631            types: v.types,
632            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
633            features: OptimizerFeatureOverrides {
634                enable_guard_subquery_tablefunc: Default::default(),
635                enable_eager_delta_joins: v.enable_eager_delta_joins,
636                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
637                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
638                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
639                enable_consolidate_after_union_negate: Default::default(),
640                enable_reduce_mfp_fusion: Default::default(),
641                enable_cardinality_estimates: Default::default(),
642                persist_fast_path_limit: Default::default(),
643                reoptimize_imported_views: v.reoptimize_imported_views,
644                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
645                enable_projection_pushdown_after_relation_cse: v
646                    .enable_projection_pushdown_after_relation_cse,
647                enable_less_reduce_in_eqprop: Default::default(),
648                enable_dequadratic_eqprop_map: Default::default(),
649                enable_eq_classes_withholding_errors: Default::default(),
650                enable_fast_path_plan_insights: Default::default(),
651                enable_cast_elimination: Default::default(),
652                enable_case_literal_transform: Default::default(),
653            },
654        })
655    }
656}
657
658fn plan_explainee(
659    scx: &StatementContext,
660    explainee: Explainee<Aug>,
661    params: &Params,
662) -> Result<plan::Explainee, PlanError> {
663    use crate::plan::ExplaineeStatement;
664
665    let is_replan = matches!(
666        explainee,
667        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
668    );
669
670    let explainee = match explainee {
671        Explainee::View(name) | Explainee::ReplanView(name) => {
672            let item = scx.get_item_by_resolved_name(&name)?;
673            let item_type = item.item_type();
674            if item_type != CatalogItemType::View {
675                sql_bail!("Expected {name} to be a view, not a {item_type}");
676            }
677            match is_replan {
678                true => crate::plan::Explainee::ReplanView(item.id()),
679                false => crate::plan::Explainee::View(item.id()),
680            }
681        }
682        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
683            let item = scx.get_item_by_resolved_name(&name)?;
684            let item_type = item.item_type();
685            if item_type != CatalogItemType::MaterializedView {
686                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
687            }
688            match is_replan {
689                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
690                false => crate::plan::Explainee::MaterializedView(item.id()),
691            }
692        }
693        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
694            let item = scx.get_item_by_resolved_name(&name)?;
695            let item_type = item.item_type();
696            if item_type != CatalogItemType::Index {
697                sql_bail!("Expected {name} to be an index, not a {item_type}");
698            }
699            match is_replan {
700                true => crate::plan::Explainee::ReplanIndex(item.id()),
701                false => crate::plan::Explainee::Index(item.id()),
702            }
703        }
704        Explainee::Select(select, broken) => {
705            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
706            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
707        }
708        Explainee::CreateView(mut stmt, broken) => {
709            if stmt.if_exists != IfExistsBehavior::Skip {
710                // If we don't force this parameter to Skip planning will
711                // fail for names that already exist in the catalog. This
712                // can happen even in `Replace` mode if the existing item
713                // has dependencies.
714                stmt.if_exists = IfExistsBehavior::Skip;
715            } else {
716                sql_bail!(
717                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
718                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
719                );
720            }
721
722            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
723                sql_bail!("expected CreateViewPlan plan");
724            };
725
726            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
727        }
728        Explainee::CreateMaterializedView(mut stmt, broken) => {
729            if stmt.if_exists != IfExistsBehavior::Skip {
730                // If we don't force this parameter to Skip planning will
731                // fail for names that already exist in the catalog. This
732                // can happen even in `Replace` mode if the existing item
733                // has dependencies.
734                stmt.if_exists = IfExistsBehavior::Skip;
735            } else {
736                sql_bail!(
737                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
738                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
739                );
740            }
741
742            let Plan::CreateMaterializedView(plan) =
743                ddl::plan_create_materialized_view(scx, *stmt)?
744            else {
745                sql_bail!("expected CreateMaterializedViewPlan plan");
746            };
747
748            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
749                broken,
750                plan,
751            })
752        }
753        Explainee::CreateIndex(mut stmt, broken) => {
754            if !stmt.if_not_exists {
755                // If we don't force this parameter to true planning will
756                // fail for index items that already exist in the catalog.
757                stmt.if_not_exists = true;
758            } else {
759                sql_bail!(
760                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
761                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
762                );
763            }
764
765            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
766                sql_bail!("expected CreateIndexPlan plan");
767            };
768
769            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
770        }
771        Explainee::Subscribe(stmt, broken) => {
772            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
773                sql_bail!("expected SubscribePlan");
774            };
775            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
776        }
777    };
778
779    Ok(explainee)
780}
781
782pub fn plan_explain_plan(
783    scx: &StatementContext,
784    explain: ExplainPlanStatement<Aug>,
785    params: &Params,
786) -> Result<Plan, PlanError> {
787    let (format, verbose_syntax) = match explain.format() {
788        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
789        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
790        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
791        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
792    };
793    let stage = explain.stage();
794
795    // Plan ExplainConfig.
796    let mut config = {
797        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
798
799        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
800            // If filtering is disabled, explain plans should not include pushdown info.
801            with_options.filter_pushdown = Some(false);
802        }
803
804        ExplainConfig::try_from(with_options)?
805    };
806    config.verbose_syntax = verbose_syntax;
807
808    let explainee = plan_explainee(scx, explain.explainee, params)?;
809
810    Ok(Plan::ExplainPlan(ExplainPlanPlan {
811        stage,
812        format,
813        config,
814        explainee,
815    }))
816}
817
818pub fn plan_explain_schema(
819    scx: &StatementContext,
820    explain_schema: ExplainSinkSchemaStatement<Aug>,
821) -> Result<Plan, PlanError> {
822    let ExplainSinkSchemaStatement {
823        schema_for,
824        // Parser limits to JSON.
825        format: _,
826        mut statement,
827    } = explain_schema;
828
829    // Force the sink's name to one that's guaranteed not to exist, by virtue of
830    // being a non-existent item in a schema under the system's control, so that
831    // `plan_create_sink` doesn't complain about the name already existing.
832    statement.name = Some(UnresolvedItemName::qualified(&[
833        ident!("mz_catalog"),
834        ident!("mz_explain_schema"),
835    ]));
836
837    crate::pure::purify_create_sink_avro_doc_on_options(
838        scx.catalog,
839        *statement.from.item_id(),
840        &mut statement.format,
841    )?;
842
843    match ddl::plan_create_sink(scx, statement)? {
844        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
845            StorageSinkConnection::Kafka(KafkaSinkConnection {
846                format:
847                    KafkaSinkFormat {
848                        key_format,
849                        value_format:
850                            KafkaSinkFormatType::Avro {
851                                schema: value_schema,
852                                ..
853                            },
854                        ..
855                    },
856                ..
857            }) => {
858                let schema = match schema_for {
859                    ExplainSinkSchemaFor::Key => key_format
860                        .and_then(|f| match f {
861                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
862                            _ => None,
863                        })
864                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
865                    ExplainSinkSchemaFor::Value => value_schema,
866                };
867
868                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
869                    sink_from: sink.from,
870                    json_schema: schema,
871                }))
872            }
873            _ => bail_unsupported!(
874                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
875            ),
876        },
877        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
878    }
879}
880
881pub fn plan_explain_pushdown(
882    scx: &StatementContext,
883    statement: ExplainPushdownStatement<Aug>,
884    params: &Params,
885) -> Result<Plan, PlanError> {
886    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
887    let explainee = plan_explainee(scx, statement.explainee, params)?;
888    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
889}
890
891pub fn plan_explain_analyze_object(
892    scx: &StatementContext,
893    statement: ExplainAnalyzeObjectStatement<Aug>,
894    params: &Params,
895) -> Result<Plan, PlanError> {
896    let explainee_name = statement
897        .explainee
898        .name()
899        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
900        .full_name_str();
901    let explainee = plan_explainee(scx, statement.explainee, params)?;
902
903    match explainee {
904        plan::Explainee::Index(_index_id) => (),
905        plan::Explainee::MaterializedView(_item_id) => (),
906        _ => {
907            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
908        }
909    };
910
911    // generate SQL query
912
913    /* WITH {CTEs}
914       SELECT REPEAT(' ', nesting * 2) || operator AS operator
915             {columns}
916        FROM      mz_introspection.mz_lir_mapping mlm
917             JOIN {from} USING (lir_id)
918             JOIN mz_introspection.mz_mappable_objects mo
919               ON (mlm.global_id = mo.global_id)
920       WHERE     mo.name = '{plan.explainee_name}'
921             AND {predicates}
922       ORDER BY lir_id DESC
923    */
924    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
925    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
926    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
927    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
928    let mut order_by = vec!["mlm.lir_id DESC"];
929
930    match statement.properties {
931        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
932            properties,
933            skew,
934        }) => {
935            let mut worker_id = None;
936            let mut seen_properties = BTreeSet::new();
937            for property in properties {
938                // handle each property only once (belt and suspenders)
939                if !seen_properties.insert(property) {
940                    continue;
941                }
942
943                match property {
944                    ExplainAnalyzeComputationProperty::Memory => {
945                        ctes.push((
946                            "summary_memory",
947                            r#"
948  SELECT mlm.global_id AS global_id,
949         mlm.lir_id AS lir_id,
950         SUM(mas.size) AS total_memory,
951         SUM(mas.records) AS total_records,
952         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
953         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
954    FROM            mz_introspection.mz_lir_mapping mlm
955         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957                 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id"#,
959                        ));
960                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
961
962                        if skew {
963                            ctes.push((
964                                "per_worker_memory",
965                                r#"
966  SELECT mlm.global_id AS global_id,
967         mlm.lir_id AS lir_id,
968         mas.worker_id AS worker_id,
969         SUM(mas.size) AS worker_memory,
970         SUM(mas.records) AS worker_records
971    FROM            mz_introspection.mz_lir_mapping mlm
972         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
973               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
974                 ON (mas.operator_id = valid_id)
975GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
976                            ));
977                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
978
979                            if let Some(worker_id) = worker_id {
980                                predicates.push(format!("pwm.worker_id = {worker_id}"));
981                            } else {
982                                worker_id = Some("pwm.worker_id");
983                                columns.push("pwm.worker_id AS worker_id");
984                                order_by.push("worker_id");
985                            }
986
987                            columns.extend([
988                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
989                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
990                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
991                                "pg_size_pretty(sm.total_memory) AS total_memory",
992                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
993                                "pwm.worker_records AS worker_records",
994                                "sm.avg_records AS avg_records",
995                                "sm.total_records AS total_records",
996                            ]);
997                        } else {
998                            columns.extend([
999                                "pg_size_pretty(sm.total_memory) AS total_memory",
1000                                "sm.total_records AS total_records",
1001                            ]);
1002                        }
1003                    }
1004                    ExplainAnalyzeComputationProperty::Cpu => {
1005                        ctes.push((
1006                            "summary_cpu",
1007                            r#"
1008  SELECT mlm.global_id AS global_id,
1009         mlm.lir_id AS lir_id,
1010         SUM(mse.elapsed_ns) AS total_ns,
1011         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1012    FROM            mz_introspection.mz_lir_mapping mlm
1013         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1014               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1015                 ON (mse.id = valid_id)
1016GROUP BY mlm.global_id, mlm.lir_id"#,
1017                        ));
1018                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1019
1020                        if skew {
1021                            ctes.push((
1022                                "per_worker_cpu",
1023                                r#"
1024  SELECT mlm.global_id AS global_id,
1025         mlm.lir_id AS lir_id,
1026         mse.worker_id AS worker_id,
1027         SUM(mse.elapsed_ns) AS worker_ns
1028    FROM            mz_introspection.mz_lir_mapping mlm
1029         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1030               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1031                 ON (mse.id = valid_id)
1032GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1033                            ));
1034                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1035
1036                            if let Some(worker_id) = worker_id {
1037                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1038                            } else {
1039                                worker_id = Some("pwc.worker_id");
1040                                columns.push("pwc.worker_id AS worker_id");
1041                                order_by.push("worker_id");
1042                            }
1043
1044                            columns.extend([
1045                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1046                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1047                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1048                            ]);
1049                        }
1050                        columns.push(
1051                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1052                        );
1053                    }
1054                }
1055            }
1056        }
1057        ExplainAnalyzeProperty::Hints => {
1058            columns.extend([
1059                "megsa.levels AS levels",
1060                "megsa.to_cut AS to_cut",
1061                "megsa.hint AS hint",
1062                "pg_size_pretty(megsa.savings) AS savings",
1063            ]);
1064            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1065            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1066             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1067        }
1068    }
1069
1070    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1071
1072    let ctes = if !ctes.is_empty() {
1073        format!(
1074            "WITH {}",
1075            separated(
1076                ",\n",
1077                ctes.iter()
1078                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1079            )
1080        )
1081    } else {
1082        String::new()
1083    };
1084    let columns = separated(", ", columns);
1085    let from = separated(" ", from);
1086    let predicates = separated(" AND ", predicates);
1087    let order_by = separated(", ", order_by);
1088    let query = format!(
1089        r#"{ctes}
1090SELECT {columns}
1091FROM {from}
1092WHERE {predicates}
1093ORDER BY {order_by}"#
1094    );
1095
1096    if statement.as_sql {
1097        let rows = vec![Row::pack_slice(&[Datum::String(
1098            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1099                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1100            })?,
1101        )])];
1102        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1103
1104        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1105    } else {
1106        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1107        show_select.plan()
1108    }
1109}
1110
1111pub fn plan_explain_analyze_cluster(
1112    scx: &StatementContext,
1113    statement: ExplainAnalyzeClusterStatement,
1114    _params: &Params,
1115) -> Result<Plan, PlanError> {
1116    // object string
1117    // worker_id uint64        (if           skew)
1118    // memory_ratio numeric    (if memory && skew)
1119    // worker_memory string    (if memory && skew)
1120    // avg_memory string       (if memory && skew)
1121    // total_memory string     (if memory)
1122    // records_ratio numeric   (if memory && skew)
1123    // worker_records          (if memory && skew)
1124    // avg_records numeric     (if memory && skew)
1125    // total_records numeric   (if memory)
1126    // cpu_ratio numeric       (if cpu    && skew)
1127    // worker_elapsed interval (if cpu    && skew)
1128    // avg_elapsed interval    (if cpu    && skew)
1129    // total_elapsed interval  (if cpu)
1130
1131    /* WITH {CTEs}
1132       SELECT mo.name AS object
1133             {columns}
1134        FROM mz_introspection.mz_mappable_objects mo
1135             {from}
1136       WHERE {predicates}
1137       ORDER BY {order_by}, mo.name DESC
1138    */
1139    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1140    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1141    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1142    let mut predicates = vec![];
1143    let mut order_by = vec![];
1144
1145    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1146    let mut worker_id = None;
1147    let mut seen_properties = BTreeSet::new();
1148    for property in properties {
1149        // handle each property only once (belt and suspenders)
1150        if !seen_properties.insert(property) {
1151            continue;
1152        }
1153
1154        match property {
1155            ExplainAnalyzeComputationProperty::Memory => {
1156                if skew {
1157                    let mut set_worker_id = false;
1158                    if let Some(worker_id) = worker_id {
1159                        // join condition if we're showing skew for more than one property
1160                        predicates.push(format!("om.worker_id = {worker_id}"));
1161                    } else {
1162                        worker_id = Some("om.worker_id");
1163                        columns.push("om.worker_id AS worker_id");
1164                        set_worker_id = true; // we'll add ourselves to `order_by` later
1165                    };
1166
1167                    // computes the average memory per LIR operator (for per operator ratios)
1168                    ctes.push((
1169                    "per_operator_memory_summary",
1170                    r#"
1171SELECT mlm.global_id AS global_id,
1172       mlm.lir_id AS lir_id,
1173       SUM(mas.size) AS total_memory,
1174       SUM(mas.records) AS total_records,
1175       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1176       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1177FROM        mz_introspection.mz_lir_mapping mlm
1178 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1179       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1180         ON (mas.operator_id = valid_id)
1181GROUP BY mlm.global_id, mlm.lir_id"#,
1182                ));
1183
1184                    // computes the memory per worker in a per operator way
1185                    ctes.push((
1186                    "per_operator_memory_per_worker",
1187                    r#"
1188SELECT mlm.global_id AS global_id,
1189       mlm.lir_id AS lir_id,
1190       mas.worker_id AS worker_id,
1191       SUM(mas.size) AS worker_memory,
1192       SUM(mas.records) AS worker_records
1193FROM        mz_introspection.mz_lir_mapping mlm
1194 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1195       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1196         ON (mas.operator_id = valid_id)
1197GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1198                    ));
1199
1200                    // computes memory ratios per worker per operator
1201                    ctes.push((
1202                    "per_operator_memory_ratios",
1203                    r#"
1204SELECT pompw.global_id AS global_id,
1205       pompw.lir_id AS lir_id,
1206       pompw.worker_id AS worker_id,
1207       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1208       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1209  FROM      per_operator_memory_per_worker pompw
1210       JOIN per_operator_memory_summary poms
1211         USING (global_id, lir_id)
1212"#,
1213                    ));
1214
1215                    // summarizes each object, per worker
1216                    ctes.push((
1217                        "object_memory",
1218                        r#"
1219SELECT pompw.global_id AS global_id,
1220       pompw.worker_id AS worker_id,
1221       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1222       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1223       SUM(pompw.worker_memory) AS worker_memory,
1224       SUM(pompw.worker_records) AS worker_records
1225FROM        per_operator_memory_per_worker pompw
1226     JOIN   per_operator_memory_ratios pomr
1227     USING (global_id, worker_id, lir_id)
1228GROUP BY pompw.global_id, pompw.worker_id
1229"#,
1230                    ));
1231
1232                    // summarizes each worker
1233                    ctes.push(("object_average_memory", r#"
1234SELECT om.global_id AS global_id,
1235       SUM(om.worker_memory) AS total_memory,
1236       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1237       SUM(om.worker_records) AS total_records,
1238       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1239  FROM object_memory om
1240GROUP BY om.global_id"#));
1241
1242                    from.push("LEFT JOIN object_memory om USING (global_id)");
1243                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1244
1245                    columns.extend([
1246                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1247                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1248                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1249                        "pg_size_pretty(oam.total_memory) AS total_memory",
1250                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1251                        "om.worker_records AS worker_records",
1252                        "oam.avg_records AS avg_records",
1253                        "oam.total_records AS total_records",
1254                    ]);
1255
1256                    order_by.extend([
1257                        "max_operator_memory_ratio DESC",
1258                        "max_operator_records_ratio DESC",
1259                        "om.worker_memory DESC",
1260                        "worker_records DESC",
1261                    ]);
1262
1263                    if set_worker_id {
1264                        order_by.push("worker_id");
1265                    }
1266                } else {
1267                    // no skew, so just compute totals
1268                    ctes.push((
1269                        "per_operator_memory_totals",
1270                        r#"
1271    SELECT mlm.global_id AS global_id,
1272           mlm.lir_id AS lir_id,
1273           SUM(mas.size) AS total_memory,
1274           SUM(mas.records) AS total_records
1275    FROM        mz_introspection.mz_lir_mapping mlm
1276     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1277           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1278             ON (mas.operator_id = valid_id)
1279    GROUP BY mlm.global_id, mlm.lir_id"#,
1280                    ));
1281
1282                    ctes.push((
1283                        "object_memory_totals",
1284                        r#"
1285SELECT pomt.global_id AS global_id,
1286       SUM(pomt.total_memory) AS total_memory,
1287       SUM(pomt.total_records) AS total_records
1288FROM per_operator_memory_totals pomt
1289GROUP BY pomt.global_id
1290"#,
1291                    ));
1292
1293                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1294                    columns.extend([
1295                        "pg_size_pretty(omt.total_memory) AS total_memory",
1296                        "omt.total_records AS total_records",
1297                    ]);
1298                    order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1299                }
1300            }
1301            ExplainAnalyzeComputationProperty::Cpu => {
1302                if skew {
1303                    let mut set_worker_id = false;
1304                    if let Some(worker_id) = worker_id {
1305                        // join condition if we're showing skew for more than one property
1306                        predicates.push(format!("oc.worker_id = {worker_id}"));
1307                    } else {
1308                        worker_id = Some("oc.worker_id");
1309                        columns.push("oc.worker_id AS worker_id");
1310                        set_worker_id = true; // we'll add ourselves to `order_by` later
1311                    };
1312
1313                    // computes the average memory per LIR operator (for per operator ratios)
1314                    ctes.push((
1315    "per_operator_cpu_summary",
1316    r#"
1317SELECT mlm.global_id AS global_id,
1318       mlm.lir_id AS lir_id,
1319       SUM(mse.elapsed_ns) AS total_ns,
1320       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1321FROM       mz_introspection.mz_lir_mapping mlm
1322CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1323      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1324        ON (mse.id = valid_id)
1325GROUP BY mlm.global_id, mlm.lir_id"#,
1326));
1327
1328                    // computes the CPU per worker in a per operator way
1329                    ctes.push((
1330                        "per_operator_cpu_per_worker",
1331                        r#"
1332SELECT mlm.global_id AS global_id,
1333       mlm.lir_id AS lir_id,
1334       mse.worker_id AS worker_id,
1335       SUM(mse.elapsed_ns) AS worker_ns
1336FROM       mz_introspection.mz_lir_mapping mlm
1337CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1338      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1339        ON (mse.id = valid_id)
1340GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1341                    ));
1342
1343                    // computes CPU ratios per worker per operator
1344                    ctes.push((
1345                        "per_operator_cpu_ratios",
1346                        r#"
1347SELECT pocpw.global_id AS global_id,
1348       pocpw.lir_id AS lir_id,
1349       pocpw.worker_id AS worker_id,
1350       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1351FROM      per_operator_cpu_per_worker pocpw
1352     JOIN per_operator_cpu_summary pocs
1353     USING (global_id, lir_id)
1354"#,
1355                    ));
1356
1357                    // summarizes each object, per worker
1358                    ctes.push((
1359                        "object_cpu",
1360                        r#"
1361SELECT pocpw.global_id AS global_id,
1362       pocpw.worker_id AS worker_id,
1363       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1364       SUM(pocpw.worker_ns) AS worker_ns
1365FROM      per_operator_cpu_per_worker pocpw
1366     JOIN per_operator_cpu_ratios pomr
1367     USING (global_id, worker_id, lir_id)
1368GROUP BY pocpw.global_id, pocpw.worker_id
1369"#,
1370                    ));
1371
1372                    // summarizes each worker
1373                    ctes.push((
1374                        "object_average_cpu",
1375                        r#"
1376SELECT oc.global_id AS global_id,
1377       SUM(oc.worker_ns) AS total_ns,
1378       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1379  FROM object_cpu oc
1380GROUP BY oc.global_id"#,));
1381
1382                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1383                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1384
1385                    columns.extend([
1386                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1387                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1388                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1389                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1390                    ]);
1391
1392                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1393
1394                    if set_worker_id {
1395                        order_by.push("worker_id");
1396                    }
1397                } else {
1398                    // no skew, so just compute totals
1399                    ctes.push((
1400                        "per_operator_cpu_totals",
1401                        r#"
1402    SELECT mlm.global_id AS global_id,
1403           mlm.lir_id AS lir_id,
1404           SUM(mse.elapsed_ns) AS total_ns
1405    FROM        mz_introspection.mz_lir_mapping mlm
1406     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1407           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1408             ON (mse.id = valid_id)
1409    GROUP BY mlm.global_id, mlm.lir_id"#,
1410                    ));
1411
1412                    ctes.push((
1413                        "object_cpu_totals",
1414                        r#"
1415SELECT poct.global_id AS global_id,
1416       SUM(poct.total_ns) AS total_ns
1417FROM per_operator_cpu_totals poct
1418GROUP BY poct.global_id
1419"#,
1420                    ));
1421
1422                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1423                    columns
1424                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1425                    order_by.extend(["total_elapsed DESC"]);
1426                }
1427            }
1428        }
1429    }
1430
1431    // generate SQL query text
1432    let ctes = if !ctes.is_empty() {
1433        format!(
1434            "WITH {}",
1435            separated(
1436                ",\n",
1437                ctes.iter()
1438                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1439            )
1440        )
1441    } else {
1442        String::new()
1443    };
1444    let columns = separated(", ", columns);
1445    let from = separated(" ", from);
1446    let predicates = if !predicates.is_empty() {
1447        format!("WHERE {}", separated(" AND ", predicates))
1448    } else {
1449        String::new()
1450    };
1451    // add mo.name last, to break ties only
1452    order_by.push("mo.name DESC");
1453    let order_by = separated(", ", order_by);
1454    let query = format!(
1455        r#"{ctes}
1456SELECT {columns}
1457FROM {from}
1458{predicates}
1459ORDER BY {order_by}"#
1460    );
1461
1462    if statement.as_sql {
1463        let rows = vec![Row::pack_slice(&[Datum::String(
1464            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1465                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1466            })?,
1467        )])];
1468        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1469
1470        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1471    } else {
1472        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1473        show_select.plan()
1474    }
1475}
1476
1477pub fn plan_explain_timestamp(
1478    scx: &StatementContext,
1479    explain: ExplainTimestampStatement<Aug>,
1480) -> Result<Plan, PlanError> {
1481    let (format, _verbose_syntax) = match explain.format() {
1482        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1483        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1484        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1485        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1486    };
1487
1488    let raw_plan = {
1489        let query::PlannedRootQuery {
1490            expr: raw_plan,
1491            desc: _,
1492            finishing: _,
1493            scope: _,
1494        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1495        if raw_plan.contains_parameters()? {
1496            return Err(PlanError::ParameterNotAllowed(
1497                "EXPLAIN TIMESTAMP".to_string(),
1498            ));
1499        }
1500
1501        raw_plan
1502    };
1503    let when = query::plan_as_of(scx, explain.select.as_of)?;
1504
1505    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1506        format,
1507        raw_plan,
1508        when,
1509    }))
1510}
1511
1512generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1513
1514pub fn describe_subscribe(
1515    scx: &StatementContext,
1516    stmt: SubscribeStatement<Aug>,
1517) -> Result<StatementDesc, PlanError> {
1518    let relation_desc = match stmt.relation {
1519        SubscribeRelation::Name(name) => {
1520            let item = scx.get_item_by_resolved_name(&name)?;
1521            match item.relation_desc() {
1522                Some(desc) => desc.into_owned(),
1523                None => sql_bail!(
1524                    "'{}' cannot be subscribed to because it is a {}",
1525                    name.full_name_str(),
1526                    item.item_type(),
1527                ),
1528            }
1529        }
1530        SubscribeRelation::Query(query) => {
1531            let query::PlannedRootQuery { desc, .. } =
1532                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1533            desc
1534        }
1535    };
1536    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1537    let progress = progress.unwrap_or(false);
1538    let mut desc = RelationDesc::builder().with_column(
1539        "mz_timestamp",
1540        SqlScalarType::Numeric {
1541            max_scale: Some(NumericMaxScale::ZERO),
1542        }
1543        .nullable(false),
1544    );
1545    if progress {
1546        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1547    }
1548
1549    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1550    match stmt.output {
1551        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1552            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1553            for (name, mut ty) in relation_desc.into_iter() {
1554                if progress {
1555                    ty.nullable = true;
1556                }
1557                desc = desc.with_column(name, ty);
1558            }
1559        }
1560        SubscribeOutput::EnvelopeUpsert { key_columns }
1561        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1562            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1563            let key_columns = key_columns
1564                .into_iter()
1565                .map(normalize::column_name)
1566                .collect_vec();
1567            let mut before_values_desc = RelationDesc::builder();
1568            let mut after_values_desc = RelationDesc::builder();
1569
1570            // Add the key columns in the order that they're specified.
1571            for column_name in &key_columns {
1572                let mut column_ty = relation_desc
1573                    .get_by_name(column_name)
1574                    .map(|(_pos, ty)| ty.clone())
1575                    .ok_or_else(|| PlanError::UnknownColumn {
1576                        table: None,
1577                        column: column_name.clone(),
1578                        similar: Box::new([]),
1579                    })?;
1580                if progress {
1581                    column_ty.nullable = true;
1582                }
1583                desc = desc.with_column(column_name, column_ty);
1584            }
1585
1586            // Then add the remaining columns in the order from the original
1587            // table, filtering out the key columns since we added those above.
1588            for (mut name, mut ty) in relation_desc
1589                .into_iter()
1590                .filter(|(name, _ty)| !key_columns.contains(name))
1591            {
1592                ty.nullable = true;
1593                before_values_desc =
1594                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1595                if debezium {
1596                    name = format!("after_{}", name).into();
1597                }
1598                after_values_desc = after_values_desc.with_column(name, ty);
1599            }
1600
1601            if debezium {
1602                desc = desc.concat(before_values_desc);
1603            }
1604            desc = desc.concat(after_values_desc);
1605        }
1606    }
1607    Ok(StatementDesc::new(Some(desc.finish())))
1608}
1609
1610pub fn plan_subscribe(
1611    scx: &StatementContext,
1612    SubscribeStatement {
1613        relation,
1614        options,
1615        as_of,
1616        up_to,
1617        output,
1618    }: SubscribeStatement<Aug>,
1619    params: &Params,
1620    copy_to: Option<CopyFormat>,
1621) -> Result<Plan, PlanError> {
1622    let (from, desc, scope) = match relation {
1623        SubscribeRelation::Name(name) => {
1624            let item = scx.get_item_by_resolved_name(&name)?;
1625            let Some(desc) = item.relation_desc() else {
1626                sql_bail!(
1627                    "'{}' cannot be subscribed to because it is a {}",
1628                    name.full_name_str(),
1629                    item.item_type(),
1630                );
1631            };
1632            let item_name = match name {
1633                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1634                _ => None,
1635            };
1636            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1637            (
1638                SubscribeFrom::Id(item.global_id()),
1639                desc.into_owned(),
1640                scope,
1641            )
1642        }
1643        SubscribeRelation::Query(query) => {
1644            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1645            let query::PlannedRootQuery {
1646                mut expr,
1647                desc,
1648                finishing,
1649                scope,
1650            } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1651            expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?;
1652            let query = query::PlannedRootQuery {
1653                expr,
1654                desc,
1655                finishing,
1656                scope,
1657            };
1658            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1659            // finishing should have already been turned into a `TopK` by
1660            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1661            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1662                &query.finishing,
1663                query.desc.arity()
1664            ));
1665            let desc = query.desc.clone();
1666            (
1667                SubscribeFrom::Query {
1668                    expr: query.expr,
1669                    desc: query.desc,
1670                },
1671                desc,
1672                query.scope,
1673            )
1674        }
1675    };
1676
1677    let when = query::plan_as_of(scx, as_of)?;
1678    let up_to = up_to
1679        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1680        .transpose()?;
1681
1682    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1683    let ecx = ExprContext {
1684        qcx: &qcx,
1685        name: "",
1686        scope: &scope,
1687        relation_type: desc.typ(),
1688        allow_aggregates: false,
1689        allow_subqueries: true,
1690        allow_parameters: true,
1691        allow_windows: false,
1692    };
1693
1694    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1695    let output = match output {
1696        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1697        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1698            let order_by = key_columns
1699                .iter()
1700                .map(|ident| OrderByExpr {
1701                    expr: Expr::Identifier(vec![ident.clone()]),
1702                    asc: None,
1703                    nulls_last: None,
1704                })
1705                .collect_vec();
1706            let (order_by, map_exprs) = query::plan_order_by_exprs(
1707                &ExprContext {
1708                    name: "ENVELOPE UPSERT KEY clause",
1709                    ..ecx
1710                },
1711                &order_by[..],
1712                &output_columns[..],
1713            )?;
1714            if !map_exprs.is_empty() {
1715                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1716            }
1717            plan::SubscribeOutput::EnvelopeUpsert {
1718                order_by_keys: order_by,
1719            }
1720        }
1721        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1722            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1723            let order_by = key_columns
1724                .iter()
1725                .map(|ident| OrderByExpr {
1726                    expr: Expr::Identifier(vec![ident.clone()]),
1727                    asc: None,
1728                    nulls_last: None,
1729                })
1730                .collect_vec();
1731            let (order_by, map_exprs) = query::plan_order_by_exprs(
1732                &ExprContext {
1733                    name: "ENVELOPE DEBEZIUM KEY clause",
1734                    ..ecx
1735                },
1736                &order_by[..],
1737                &output_columns[..],
1738            )?;
1739            if !map_exprs.is_empty() {
1740                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1741            }
1742            plan::SubscribeOutput::EnvelopeDebezium {
1743                order_by_keys: order_by,
1744            }
1745        }
1746        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1747            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1748            let mz_diff = "mz_diff".into();
1749            let output_columns = std::iter::once((0, &mz_diff))
1750                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1751                .collect_vec();
1752            match query::plan_order_by_exprs(
1753                &ExprContext {
1754                    name: "WITHIN TIMESTAMP ORDER BY clause",
1755                    ..ecx
1756                },
1757                &order_by[..],
1758                &output_columns[..],
1759            ) {
1760                Err(PlanError::UnknownColumn {
1761                    table: None,
1762                    column,
1763                    similar: _,
1764                }) if &column == &mz_diff => {
1765                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1766                    // it looks like an unknown column. Instead, return a better error
1767                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1768                }
1769                Err(e) => return Err(e),
1770                Ok((order_by, map_exprs)) => {
1771                    if !map_exprs.is_empty() {
1772                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1773                    }
1774
1775                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1776                }
1777            }
1778        }
1779    };
1780
1781    let SubscribeOptionExtracted {
1782        progress, snapshot, ..
1783    } = options.try_into()?;
1784    Ok(Plan::Subscribe(SubscribePlan {
1785        from,
1786        when,
1787        up_to,
1788        with_snapshot: snapshot.unwrap_or(true),
1789        copy_to,
1790        emit_progress: progress.unwrap_or(false),
1791        output,
1792    }))
1793}
1794
1795pub fn describe_copy_from_table(
1796    scx: &StatementContext,
1797    table_name: <Aug as AstInfo>::ItemName,
1798    columns: Vec<Ident>,
1799) -> Result<StatementDesc, PlanError> {
1800    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1801    Ok(StatementDesc::new(Some(desc)))
1802}
1803
1804pub fn describe_copy_item(
1805    scx: &StatementContext,
1806    object_name: <Aug as AstInfo>::ItemName,
1807    columns: Vec<Ident>,
1808) -> Result<StatementDesc, PlanError> {
1809    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1810    Ok(StatementDesc::new(Some(desc)))
1811}
1812
1813pub fn describe_copy(
1814    scx: &StatementContext,
1815    CopyStatement {
1816        relation,
1817        direction,
1818        ..
1819    }: CopyStatement<Aug>,
1820) -> Result<StatementDesc, PlanError> {
1821    Ok(match (relation, direction) {
1822        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1823            describe_copy_item(scx, name, columns)?
1824        }
1825        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1826            describe_copy_from_table(scx, name, columns)?
1827        }
1828        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1829        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1830    }
1831    .with_is_copy())
1832}
1833
1834fn plan_copy_to_expr(
1835    scx: &StatementContext,
1836    select_plan: SelectPlan,
1837    desc: RelationDesc,
1838    to: &Expr<Aug>,
1839    format: CopyFormat,
1840    options: CopyOptionExtracted,
1841) -> Result<Plan, PlanError> {
1842    let conn_id = match options.aws_connection {
1843        Some(conn_id) => CatalogItemId::from(conn_id),
1844        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1845    };
1846    let connection = scx.get_item(&conn_id).connection()?;
1847
1848    match connection {
1849        mz_storage_types::connections::Connection::Aws(_) => {}
1850        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1851    }
1852
1853    let format = match format {
1854        CopyFormat::Csv => {
1855            let quote = extract_byte_param_value(options.quote, "quote")?;
1856            let escape = extract_byte_param_value(options.escape, "escape")?;
1857            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1858            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1859                CopyCsvFormatParams::try_new(
1860                    delimiter,
1861                    quote,
1862                    escape,
1863                    options.header,
1864                    options.null,
1865                )
1866                .map_err(|e| sql_err!("{}", e))?,
1867            ))
1868        }
1869        CopyFormat::Parquet => {
1870            // Validate that the output desc can be formatted as parquet
1871            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1872            S3SinkFormat::Parquet
1873        }
1874        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1875        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1876    };
1877
1878    // Converting the to expr to a HirScalarExpr
1879    let mut to_expr = to.clone();
1880    transform_ast::transform(scx, &mut to_expr)?;
1881    let relation_type = RelationDesc::empty();
1882    let ecx = &ExprContext {
1883        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1884        name: "COPY TO target",
1885        scope: &Scope::empty(),
1886        relation_type: relation_type.typ(),
1887        allow_aggregates: false,
1888        allow_subqueries: false,
1889        allow_parameters: false,
1890        allow_windows: false,
1891    };
1892
1893    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1894
1895    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1896        sql_bail!(
1897            "MAX FILE SIZE cannot be less than {}",
1898            MIN_S3_SINK_FILE_SIZE
1899        );
1900    }
1901    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1902        sql_bail!(
1903            "MAX FILE SIZE cannot be greater than {}",
1904            MAX_S3_SINK_FILE_SIZE
1905        );
1906    }
1907
1908    Ok(Plan::CopyTo(CopyToPlan {
1909        select_plan,
1910        desc,
1911        to,
1912        connection: connection.to_owned(),
1913        connection_id: conn_id,
1914        format,
1915        max_file_size: options.max_file_size.as_bytes(),
1916    }))
1917}
1918
1919fn plan_copy_from(
1920    scx: &StatementContext,
1921    target: &CopyTarget<Aug>,
1922    table_name: ResolvedItemName,
1923    columns: Vec<Ident>,
1924    format: CopyFormat,
1925    options: CopyOptionExtracted,
1926) -> Result<Plan, PlanError> {
1927    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1928        match option {
1929            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1930            None => Ok(()),
1931        }
1932    }
1933
1934    let source = match target {
1935        CopyTarget::Stdin => CopyFromSource::Stdin,
1936        CopyTarget::Expr(from) => {
1937            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1938
1939            // Converting the expr to an HirScalarExpr
1940            let mut from_expr = from.clone();
1941            transform_ast::transform(scx, &mut from_expr)?;
1942            let relation_type = RelationDesc::empty();
1943            let ecx = &ExprContext {
1944                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1945                name: "COPY FROM target",
1946                scope: &Scope::empty(),
1947                relation_type: relation_type.typ(),
1948                allow_aggregates: false,
1949                allow_subqueries: false,
1950                allow_parameters: false,
1951                allow_windows: false,
1952            };
1953            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1954
1955            match options.aws_connection {
1956                Some(conn_id) => {
1957                    let conn_id = CatalogItemId::from(conn_id);
1958
1959                    // Validate the connection type is one we expect.
1960                    let connection = match scx.get_item(&conn_id).connection()? {
1961                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1962                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1963                    };
1964
1965                    CopyFromSource::AwsS3 {
1966                        uri: from,
1967                        connection,
1968                        connection_id: conn_id,
1969                    }
1970                }
1971                None => CopyFromSource::Url(from),
1972            }
1973        }
1974        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1975    };
1976
1977    let params = match format {
1978        CopyFormat::Text => {
1979            only_available_with_csv(options.quote, "quote")?;
1980            only_available_with_csv(options.escape, "escape")?;
1981            only_available_with_csv(options.header, "HEADER")?;
1982            let delimiter =
1983                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1984            let null = match options.null {
1985                Some(null) => Cow::from(null),
1986                None => Cow::from("\\N"),
1987            };
1988            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1989        }
1990        CopyFormat::Csv => {
1991            let quote = extract_byte_param_value(options.quote, "quote")?;
1992            let escape = extract_byte_param_value(options.escape, "escape")?;
1993            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1994            CopyFormatParams::Csv(
1995                CopyCsvFormatParams::try_new(
1996                    delimiter,
1997                    quote,
1998                    escape,
1999                    options.header,
2000                    options.null,
2001                )
2002                .map_err(|e| sql_err!("{}", e))?,
2003            )
2004        }
2005        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2006        CopyFormat::Parquet => CopyFormatParams::Parquet,
2007    };
2008
2009    let filter = match (options.files, options.pattern) {
2010        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2011        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2012        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2013        (None, None) => None,
2014    };
2015
2016    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2017        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2018    }
2019
2020    let table_name_string = table_name.full_name_str();
2021
2022    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2023
2024    let Some(mfp) = maybe_mfp else {
2025        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2026    };
2027
2028    Ok(Plan::CopyFrom(CopyFromPlan {
2029        target_id: id,
2030        target_name: table_name_string,
2031        source,
2032        columns,
2033        source_desc,
2034        mfp,
2035        params,
2036        filter,
2037    }))
2038}
2039
2040fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2041    match v {
2042        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2043        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2044        None => Ok(None),
2045    }
2046}
2047
2048generate_extracted_config!(
2049    CopyOption,
2050    (Format, String),
2051    (Delimiter, String),
2052    (Null, String),
2053    (Escape, String),
2054    (Quote, String),
2055    (Header, bool),
2056    (AwsConnection, with_options::Object),
2057    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2058    (Files, Vec<String>),
2059    (Pattern, String)
2060);
2061
2062pub fn plan_copy(
2063    scx: &StatementContext,
2064    CopyStatement {
2065        relation,
2066        direction,
2067        target,
2068        options,
2069    }: CopyStatement<Aug>,
2070) -> Result<Plan, PlanError> {
2071    let options = CopyOptionExtracted::try_from(options)?;
2072    // Parse any user-provided FORMAT option. If not provided, will default to
2073    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2074    let format = options
2075        .format
2076        .as_ref()
2077        .map(|format| match format.to_lowercase().as_str() {
2078            "text" => Ok(CopyFormat::Text),
2079            "csv" => Ok(CopyFormat::Csv),
2080            "binary" => Ok(CopyFormat::Binary),
2081            "parquet" => Ok(CopyFormat::Parquet),
2082            _ => sql_bail!("unknown FORMAT: {}", format),
2083        })
2084        .transpose()?;
2085
2086    match (&direction, &target) {
2087        (CopyDirection::To, CopyTarget::Stdout) => {
2088            if options.delimiter.is_some() {
2089                sql_bail!("COPY TO does not support DELIMITER option yet");
2090            }
2091            if options.quote.is_some() {
2092                sql_bail!("COPY TO does not support QUOTE option yet");
2093            }
2094            if options.null.is_some() {
2095                sql_bail!("COPY TO does not support NULL option yet");
2096            }
2097            match relation {
2098                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2099                CopyRelation::Select(stmt) => Ok(plan_select(
2100                    scx,
2101                    stmt,
2102                    &Params::empty(),
2103                    Some(format.unwrap_or(CopyFormat::Text)),
2104                )?),
2105                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2106                    scx,
2107                    stmt,
2108                    &Params::empty(),
2109                    Some(format.unwrap_or(CopyFormat::Text)),
2110                )?),
2111            }
2112        }
2113        (CopyDirection::From, target) => match relation {
2114            CopyRelation::Named { name, columns } => plan_copy_from(
2115                scx,
2116                target,
2117                name,
2118                columns,
2119                format.unwrap_or(CopyFormat::Text),
2120                options,
2121            ),
2122            _ => sql_bail!("COPY FROM {} not supported", target),
2123        },
2124        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2125            // System users are always allowed to use this feature, even when
2126            // the flag is disabled, so that we can dogfood for analytics in
2127            // production environments. The feature is stable enough that we're
2128            // not worried about it crashing.
2129            if !scx.catalog.active_role_id().is_system() {
2130                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2131            }
2132
2133            let format = match format {
2134                Some(inner) => inner,
2135                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2136            };
2137
2138            let stmt = match relation {
2139                CopyRelation::Named { name, columns } => {
2140                    if !columns.is_empty() {
2141                        // TODO(mouli): Add support for this
2142                        sql_bail!(
2143                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2144                        );
2145                    }
2146                    // Generate a synthetic SELECT query that just gets the table
2147                    let query = Query {
2148                        ctes: CteBlock::empty(),
2149                        body: SetExpr::Table(name),
2150                        order_by: vec![],
2151                        limit: None,
2152                        offset: None,
2153                    };
2154                    SelectStatement { query, as_of: None }
2155                }
2156                CopyRelation::Select(stmt) => {
2157                    if !stmt.query.order_by.is_empty() {
2158                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2159                    }
2160                    stmt
2161                }
2162                CopyRelation::Subscribe(_) => {
2163                    sql_bail!("COPY {} {} not supported", direction, target)
2164                }
2165            };
2166
2167            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2168            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2169        }
2170        _ => sql_bail!("COPY {} {} not supported", direction, target),
2171    }
2172}