Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_expr::visit::Visit;
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_ore::str::separated;
25use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
26use mz_repr::adt::numeric::NumericMaxScale;
27use mz_repr::bytes::ByteSize;
28use mz_repr::explain::{ExplainConfig, ExplainFormat};
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
31use mz_sql_parser::ast::{
32    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
33    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
34    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36    SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::{AstDisplay, escaped_string_literal};
45use crate::ast::{
46    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49    UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{
70    self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
71};
72
73// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
74// plans the whole query to determine its shape and parameter types,
75// and then throws away that plan. If we were smarter, we'd stash that
76// plan somewhere so we don't have to recompute it when the query is
77// executed.
78
79pub fn describe_insert(
80    scx: &StatementContext,
81    InsertStatement {
82        table_name,
83        columns,
84        source,
85        returning,
86    }: InsertStatement<Aug>,
87) -> Result<StatementDesc, PlanError> {
88    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
89    let desc = if returning.expr.is_empty() {
90        None
91    } else {
92        Some(returning.desc)
93    };
94    Ok(StatementDesc::new(desc))
95}
96
97pub fn plan_insert(
98    scx: &StatementContext,
99    InsertStatement {
100        table_name,
101        columns,
102        source,
103        returning,
104    }: InsertStatement<Aug>,
105    params: &Params,
106) -> Result<Plan, PlanError> {
107    let (id, mut expr, returning) =
108        query::plan_insert_query(scx, table_name, columns, source, returning)?;
109    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
110    let returning = returning
111        .expr
112        .into_iter()
113        .map(|mut expr| {
114            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
115            expr.lower_uncorrelated(scx.catalog.system_vars())
116        })
117        .collect::<Result<Vec<_>, _>>()?;
118
119    Ok(Plan::Insert(InsertPlan {
120        id,
121        values: expr,
122        returning,
123    }))
124}
125
126pub fn describe_delete(
127    scx: &StatementContext,
128    stmt: DeleteStatement<Aug>,
129) -> Result<StatementDesc, PlanError> {
130    query::plan_delete_query(scx, stmt)?;
131    Ok(StatementDesc::new(None))
132}
133
134pub fn plan_delete(
135    scx: &StatementContext,
136    stmt: DeleteStatement<Aug>,
137    params: &Params,
138) -> Result<Plan, PlanError> {
139    let rtw_plan = query::plan_delete_query(scx, stmt)?;
140    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
141}
142
143pub fn describe_update(
144    scx: &StatementContext,
145    stmt: UpdateStatement<Aug>,
146) -> Result<StatementDesc, PlanError> {
147    query::plan_update_query(scx, stmt)?;
148    Ok(StatementDesc::new(None))
149}
150
151pub fn plan_update(
152    scx: &StatementContext,
153    stmt: UpdateStatement<Aug>,
154    params: &Params,
155) -> Result<Plan, PlanError> {
156    let rtw_plan = query::plan_update_query(scx, stmt)?;
157    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
158}
159
160pub fn plan_read_then_write(
161    scx: &StatementContext,
162    kind: MutationKind,
163    params: &Params,
164    query::ReadThenWritePlan {
165        id,
166        mut selection,
167        finishing,
168        assignments,
169    }: query::ReadThenWritePlan,
170) -> Result<Plan, PlanError> {
171    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
172    let mut assignments_outer = BTreeMap::new();
173    for (idx, mut set) in assignments {
174        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
175        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
176        assignments_outer.insert(idx, set);
177    }
178
179    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
180        id,
181        selection,
182        finishing,
183        assignments: assignments_outer,
184        kind,
185        returning: Vec::new(),
186    }))
187}
188
189pub fn describe_select(
190    scx: &StatementContext,
191    stmt: SelectStatement<Aug>,
192) -> Result<StatementDesc, PlanError> {
193    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
194        return Ok(StatementDesc::new(Some(desc)));
195    }
196
197    let query::PlannedRootQuery { desc, .. } =
198        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
199    Ok(StatementDesc::new(Some(desc)))
200}
201
202pub fn plan_select(
203    scx: &StatementContext,
204    select: SelectStatement<Aug>,
205    params: &Params,
206    copy_to: Option<CopyFormat>,
207) -> Result<Plan, PlanError> {
208    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
209        return Ok(Plan::SideEffectingFunc(f));
210    }
211
212    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
213    Ok(Plan::Select(plan))
214}
215
216fn plan_select_inner(
217    scx: &StatementContext,
218    select: SelectStatement<Aug>,
219    params: &Params,
220    copy_to: Option<CopyFormat>,
221) -> Result<(SelectPlan, RelationDesc), PlanError> {
222    let when = query::plan_as_of(scx, select.as_of.clone())?;
223    let lifetime = QueryLifetime::OneShot;
224    let query::PlannedRootQuery {
225        mut expr,
226        desc,
227        finishing,
228        scope: _,
229    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
230    expr.bind_parameters(scx, lifetime, params)?;
231
232    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
233    // Let's check this and simplify them to literals.
234    expr.try_visit_mut_pre(&mut |expr| {
235        if let HirRelationExpr::TopK { offset, .. } = expr {
236            let offset_value = offset_into_value(offset.take())?;
237            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
238        }
239        Ok::<(), PlanError>(())
240    })?;
241    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
242    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
243    // so later.)
244
245    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
246    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
247    // This involves binding parameters and evaluating each expression to a number.
248    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
249    // so this `limit` has to be a constant.)
250    let limit = match finishing.limit {
251        None => None,
252        Some(mut limit) => {
253            limit.bind_parameters(scx, lifetime, params)?;
254            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
255            let Some(limit) = limit.as_literal() else {
256                sql_bail!(
257                    "Top-level LIMIT must be a constant expression, got {}",
258                    limit
259                )
260            };
261            match limit {
262                Datum::Null => None,
263                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
264                _ => {
265                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
266                    sql_bail!("LIMIT must be a non-negative INT or NULL")
267                }
268            }
269        }
270    };
271    let offset = {
272        let mut offset = finishing.offset.clone();
273        offset.bind_parameters(scx, lifetime, params)?;
274        let offset = offset_into_value(offset.take())?;
275        offset
276            .try_into()
277            .expect("checked in offset_into_value that it is not negative")
278    };
279
280    // Unmaterializable functions are evaluated based on various information (e.g., the catalog)
281    // during sequencing, without taking AS OF into account. This would be hard to fix, so for now
282    // we just disallow AS OF when there is an unmaterializable function in a query (except mz_now).
283    if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
284        && select.as_of.is_some()
285        && expr.contains_unmaterializable_except_temporal()?
286    {
287        bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
288    }
289
290    let plan = SelectPlan {
291        source: expr,
292        when,
293        finishing: RowSetFinishing {
294            limit,
295            offset,
296            project: finishing.project,
297            order_by: finishing.order_by,
298        },
299        copy_to,
300        select: Some(Box::new(select)),
301    };
302
303    Ok((plan, desc))
304}
305
306pub fn describe_explain_plan(
307    scx: &StatementContext,
308    explain: ExplainPlanStatement<Aug>,
309) -> Result<StatementDesc, PlanError> {
310    let mut relation_desc = RelationDesc::builder();
311
312    match explain.stage() {
313        ExplainStage::RawPlan => {
314            let name = "Raw Plan";
315            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316        }
317        ExplainStage::DecorrelatedPlan => {
318            let name = "Decorrelated Plan";
319            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320        }
321        ExplainStage::LocalPlan => {
322            let name = "Locally Optimized Plan";
323            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
324        }
325        ExplainStage::GlobalPlan => {
326            let name = "Optimized Plan";
327            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
328        }
329        ExplainStage::PhysicalPlan => {
330            let name = "Physical Plan";
331            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
332        }
333        ExplainStage::Trace => {
334            relation_desc = relation_desc
335                .with_column("Time", SqlScalarType::UInt64.nullable(false))
336                .with_column("Path", SqlScalarType::String.nullable(false))
337                .with_column("Plan", SqlScalarType::String.nullable(false));
338        }
339        ExplainStage::PlanInsights => {
340            let name = "Plan Insights";
341            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
342        }
343    };
344    let relation_desc = relation_desc.finish();
345
346    Ok(
347        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
348            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
349            _ => vec![],
350        }),
351    )
352}
353
354pub fn describe_explain_pushdown(
355    scx: &StatementContext,
356    statement: ExplainPushdownStatement<Aug>,
357) -> Result<StatementDesc, PlanError> {
358    let relation_desc = RelationDesc::builder()
359        .with_column("Source", SqlScalarType::String.nullable(false))
360        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
361        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
362        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
363        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
364        .finish();
365
366    Ok(
367        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
368            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
369            _ => vec![],
370        }),
371    )
372}
373
374pub fn describe_explain_analyze_object(
375    _scx: &StatementContext,
376    statement: ExplainAnalyzeObjectStatement<Aug>,
377) -> Result<StatementDesc, PlanError> {
378    if statement.as_sql {
379        let relation_desc = RelationDesc::builder()
380            .with_column("SQL", SqlScalarType::String.nullable(false))
381            .finish();
382        return Ok(StatementDesc::new(Some(relation_desc)));
383    }
384
385    match statement.properties {
386        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
387            properties,
388            skew,
389        }) => {
390            let mut relation_desc = RelationDesc::builder()
391                .with_column("operator", SqlScalarType::String.nullable(false));
392
393            if skew {
394                relation_desc =
395                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
396            }
397
398            let mut seen_properties = BTreeSet::new();
399            for property in properties {
400                // handle each property only once (belt and suspenders)
401                if !seen_properties.insert(property) {
402                    continue;
403                }
404
405                match property {
406                    ExplainAnalyzeComputationProperty::Memory if skew => {
407                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
408                        relation_desc = relation_desc
409                            .with_column("memory_ratio", numeric.clone())
410                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
411                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
412                            .with_column("total_memory", SqlScalarType::String.nullable(true))
413                            .with_column("records_ratio", numeric.clone())
414                            .with_column("worker_records", numeric.clone())
415                            .with_column("avg_records", numeric.clone())
416                            .with_column("total_records", numeric);
417                    }
418                    ExplainAnalyzeComputationProperty::Memory => {
419                        relation_desc = relation_desc
420                            .with_column("total_memory", SqlScalarType::String.nullable(true))
421                            .with_column(
422                                "total_records",
423                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
424                            );
425                    }
426                    ExplainAnalyzeComputationProperty::Cpu => {
427                        if skew {
428                            relation_desc = relation_desc
429                                .with_column(
430                                    "cpu_ratio",
431                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
432                                )
433                                .with_column(
434                                    "worker_elapsed",
435                                    SqlScalarType::Interval.nullable(true),
436                                )
437                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
438                        }
439                        relation_desc = relation_desc
440                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
441                    }
442                }
443            }
444
445            let relation_desc = relation_desc.finish();
446            Ok(StatementDesc::new(Some(relation_desc)))
447        }
448        ExplainAnalyzeProperty::Hints => {
449            let relation_desc = RelationDesc::builder()
450                .with_column("operator", SqlScalarType::String.nullable(true))
451                .with_column("levels", SqlScalarType::Int64.nullable(true))
452                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
453                .with_column("hint", SqlScalarType::Float64.nullable(true))
454                .with_column("savings", SqlScalarType::String.nullable(true))
455                .finish();
456            Ok(StatementDesc::new(Some(relation_desc)))
457        }
458    }
459}
460
461pub fn describe_explain_analyze_cluster(
462    _scx: &StatementContext,
463    statement: ExplainAnalyzeClusterStatement,
464) -> Result<StatementDesc, PlanError> {
465    if statement.as_sql {
466        let relation_desc = RelationDesc::builder()
467            .with_column("SQL", SqlScalarType::String.nullable(false))
468            .finish();
469        return Ok(StatementDesc::new(Some(relation_desc)));
470    }
471
472    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
473
474    let mut relation_desc = RelationDesc::builder()
475        .with_column("object", SqlScalarType::String.nullable(false))
476        .with_column("global_id", SqlScalarType::String.nullable(false));
477
478    if skew {
479        relation_desc =
480            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
481    }
482
483    let mut seen_properties = BTreeSet::new();
484    for property in properties {
485        // handle each property only once (belt and suspenders)
486        if !seen_properties.insert(property) {
487            continue;
488        }
489
490        match property {
491            ExplainAnalyzeComputationProperty::Memory if skew => {
492                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
493                relation_desc = relation_desc
494                    .with_column("max_operator_memory_ratio", numeric.clone())
495                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
496                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
497                    .with_column("total_memory", SqlScalarType::String.nullable(true))
498                    .with_column("max_operator_records_ratio", numeric.clone())
499                    .with_column("worker_records", numeric.clone())
500                    .with_column("avg_records", numeric.clone())
501                    .with_column("total_records", numeric);
502            }
503            ExplainAnalyzeComputationProperty::Memory => {
504                relation_desc = relation_desc
505                    .with_column("total_memory", SqlScalarType::String.nullable(true))
506                    .with_column(
507                        "total_records",
508                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
509                    );
510            }
511            ExplainAnalyzeComputationProperty::Cpu if skew => {
512                relation_desc = relation_desc
513                    .with_column(
514                        "max_operator_cpu_ratio",
515                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
516                    )
517                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
518                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
519                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
520            }
521            ExplainAnalyzeComputationProperty::Cpu => {
522                relation_desc = relation_desc
523                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
524            }
525        }
526    }
527
528    Ok(StatementDesc::new(Some(relation_desc.finish())))
529}
530
531pub fn describe_explain_timestamp(
532    scx: &StatementContext,
533    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
534) -> Result<StatementDesc, PlanError> {
535    let relation_desc = RelationDesc::builder()
536        .with_column("Timestamp", SqlScalarType::String.nullable(false))
537        .finish();
538
539    Ok(StatementDesc::new(Some(relation_desc))
540        .with_params(describe_select(scx, select)?.param_types))
541}
542
543pub fn describe_explain_schema(
544    _: &StatementContext,
545    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
546) -> Result<StatementDesc, PlanError> {
547    let relation_desc = RelationDesc::builder()
548        .with_column("Schema", SqlScalarType::String.nullable(false))
549        .finish();
550    Ok(StatementDesc::new(Some(relation_desc)))
551}
552
553// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
554// `bool`:
555// - When it's an override of a global feature flag, for example optimizer feature flags. In this
556//   case, we need not just false and true, but also None to say "take the value of the global
557//   flag".
558// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
559//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
560//   but otherwise we'd like to default to false.
561generate_extracted_config!(
562    ExplainPlanOption,
563    (Arity, Option<bool>, Default(None)),
564    (Cardinality, bool, Default(false)),
565    (ColumnNames, bool, Default(false)),
566    (FilterPushdown, Option<bool>, Default(None)),
567    (HumanizedExpressions, Option<bool>, Default(None)),
568    (JoinImplementations, bool, Default(false)),
569    (Keys, bool, Default(false)),
570    (LinearChains, bool, Default(false)),
571    (NoFastPath, bool, Default(false)),
572    (NonNegative, bool, Default(false)),
573    (NoNotices, bool, Default(false)),
574    (NodeIdentifiers, bool, Default(false)),
575    (Raw, bool, Default(false)),
576    (RawPlans, bool, Default(false)),
577    (RawSyntax, bool, Default(false)),
578    (Redacted, bool, Default(false)),
579    (SubtreeSize, bool, Default(false)),
580    (Timing, bool, Default(false)),
581    (Types, bool, Default(false)),
582    (Equivalences, bool, Default(false)),
583    (ReoptimizeImportedViews, Option<bool>, Default(None)),
584    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
585    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
586    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
587    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
588    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
589    (
590        EnableProjectionPushdownAfterRelationCse,
591        Option<bool>,
592        Default(None)
593    )
594);
595
596impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
597    type Error = PlanError;
598
599    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
600        // If `WITH(raw)` is specified, ensure that the config will be as
601        // representative for the original plan as possible.
602        if v.raw {
603            v.raw_plans = true;
604            v.raw_syntax = true;
605        }
606
607        // Certain config should default to be enabled in release builds running on
608        // staging or prod (where SOFT_ASSERTIONS are turned off).
609        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
610
611        Ok(ExplainConfig {
612            arity: v.arity.unwrap_or(enable_on_prod),
613            cardinality: v.cardinality,
614            column_names: v.column_names,
615            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
616            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
617            join_impls: v.join_implementations,
618            keys: v.keys,
619            linear_chains: !v.raw_plans && v.linear_chains,
620            no_fast_path: v.no_fast_path,
621            no_notices: v.no_notices,
622            node_ids: v.node_identifiers,
623            non_negative: v.non_negative,
624            raw_plans: v.raw_plans,
625            raw_syntax: v.raw_syntax,
626            verbose_syntax: false,
627            redacted: v.redacted,
628            subtree_size: v.subtree_size,
629            equivalences: v.equivalences,
630            timing: v.timing,
631            types: v.types,
632            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
633            features: OptimizerFeatureOverrides {
634                enable_guard_subquery_tablefunc: Default::default(),
635                enable_eager_delta_joins: v.enable_eager_delta_joins,
636                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
637                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
638                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
639                enable_consolidate_after_union_negate: Default::default(),
640                enable_reduce_mfp_fusion: Default::default(),
641                enable_cardinality_estimates: Default::default(),
642                persist_fast_path_limit: Default::default(),
643                reoptimize_imported_views: v.reoptimize_imported_views,
644                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
645                enable_projection_pushdown_after_relation_cse: v
646                    .enable_projection_pushdown_after_relation_cse,
647                enable_less_reduce_in_eqprop: Default::default(),
648                enable_dequadratic_eqprop_map: Default::default(),
649                enable_eq_classes_withholding_errors: Default::default(),
650                enable_fast_path_plan_insights: Default::default(),
651                enable_cast_elimination: Default::default(),
652                enable_case_literal_transform: Default::default(),
653                enable_simplify_quantified_comparisons: Default::default(),
654                enable_coalesce_case_transform: Default::default(),
655            },
656        })
657    }
658}
659
660fn plan_explainee(
661    scx: &StatementContext,
662    explainee: Explainee<Aug>,
663    params: &Params,
664) -> Result<plan::Explainee, PlanError> {
665    use crate::plan::ExplaineeStatement;
666
667    let is_replan = matches!(
668        explainee,
669        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
670    );
671
672    let explainee = match explainee {
673        Explainee::View(name) | Explainee::ReplanView(name) => {
674            let item = scx.get_item_by_resolved_name(&name)?;
675            let item_type = item.item_type();
676            if item_type != CatalogItemType::View {
677                sql_bail!("Expected {name} to be a view, not a {item_type}");
678            }
679            match is_replan {
680                true => crate::plan::Explainee::ReplanView(item.id()),
681                false => crate::plan::Explainee::View(item.id()),
682            }
683        }
684        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
685            let item = scx.get_item_by_resolved_name(&name)?;
686            let item_type = item.item_type();
687            if item_type != CatalogItemType::MaterializedView {
688                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
689            }
690            match is_replan {
691                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
692                false => crate::plan::Explainee::MaterializedView(item.id()),
693            }
694        }
695        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
696            let item = scx.get_item_by_resolved_name(&name)?;
697            let item_type = item.item_type();
698            if item_type != CatalogItemType::Index {
699                sql_bail!("Expected {name} to be an index, not a {item_type}");
700            }
701            match is_replan {
702                true => crate::plan::Explainee::ReplanIndex(item.id()),
703                false => crate::plan::Explainee::Index(item.id()),
704            }
705        }
706        Explainee::Select(select, broken) => {
707            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
708            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
709        }
710        Explainee::CreateView(mut stmt, broken) => {
711            if stmt.if_exists != IfExistsBehavior::Skip {
712                // If we don't force this parameter to Skip planning will
713                // fail for names that already exist in the catalog. This
714                // can happen even in `Replace` mode if the existing item
715                // has dependencies.
716                stmt.if_exists = IfExistsBehavior::Skip;
717            } else {
718                sql_bail!(
719                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
720                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
721                );
722            }
723
724            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
725                sql_bail!("expected CreateViewPlan plan");
726            };
727
728            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
729        }
730        Explainee::CreateMaterializedView(mut stmt, broken) => {
731            if stmt.if_exists != IfExistsBehavior::Skip {
732                // If we don't force this parameter to Skip planning will
733                // fail for names that already exist in the catalog. This
734                // can happen even in `Replace` mode if the existing item
735                // has dependencies.
736                stmt.if_exists = IfExistsBehavior::Skip;
737            } else {
738                sql_bail!(
739                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
740                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
741                );
742            }
743
744            let Plan::CreateMaterializedView(plan) =
745                ddl::plan_create_materialized_view(scx, *stmt)?
746            else {
747                sql_bail!("expected CreateMaterializedViewPlan plan");
748            };
749
750            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
751                broken,
752                plan,
753            })
754        }
755        Explainee::CreateIndex(mut stmt, broken) => {
756            if !stmt.if_not_exists {
757                // If we don't force this parameter to true planning will
758                // fail for index items that already exist in the catalog.
759                stmt.if_not_exists = true;
760            } else {
761                sql_bail!(
762                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
763                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
764                );
765            }
766
767            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
768                sql_bail!("expected CreateIndexPlan plan");
769            };
770
771            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
772        }
773        Explainee::Subscribe(stmt, broken) => {
774            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
775                sql_bail!("expected SubscribePlan");
776            };
777            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
778        }
779    };
780
781    Ok(explainee)
782}
783
784pub fn plan_explain_plan(
785    scx: &StatementContext,
786    explain: ExplainPlanStatement<Aug>,
787    params: &Params,
788) -> Result<Plan, PlanError> {
789    let (format, verbose_syntax) = match explain.format() {
790        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
791        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
792        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
793        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
794    };
795    let stage = explain.stage();
796
797    // Plan ExplainConfig.
798    let mut config = {
799        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
800
801        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
802            // If filtering is disabled, explain plans should not include pushdown info.
803            with_options.filter_pushdown = Some(false);
804        }
805
806        ExplainConfig::try_from(with_options)?
807    };
808    config.verbose_syntax = verbose_syntax;
809
810    let explainee = plan_explainee(scx, explain.explainee, params)?;
811
812    Ok(Plan::ExplainPlan(ExplainPlanPlan {
813        stage,
814        format,
815        config,
816        explainee,
817    }))
818}
819
820pub fn plan_explain_schema(
821    scx: &StatementContext,
822    explain_schema: ExplainSinkSchemaStatement<Aug>,
823) -> Result<Plan, PlanError> {
824    let ExplainSinkSchemaStatement {
825        schema_for,
826        // Parser limits to JSON.
827        format: _,
828        mut statement,
829    } = explain_schema;
830
831    // Force the sink's name to one that's guaranteed not to exist, by virtue of
832    // being a non-existent item in a schema under the system's control, so that
833    // `plan_create_sink` doesn't complain about the name already existing.
834    statement.name = Some(UnresolvedItemName::qualified(&[
835        ident!("mz_catalog"),
836        ident!("mz_explain_schema"),
837    ]));
838
839    crate::pure::purify_create_sink_avro_doc_on_options(
840        scx.catalog,
841        *statement.from.item_id(),
842        &mut statement.format,
843    )?;
844
845    match ddl::plan_create_sink(scx, statement)? {
846        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
847            StorageSinkConnection::Kafka(KafkaSinkConnection {
848                format:
849                    KafkaSinkFormat {
850                        key_format,
851                        value_format:
852                            KafkaSinkFormatType::Avro {
853                                schema: value_schema,
854                                ..
855                            },
856                        ..
857                    },
858                ..
859            }) => {
860                let schema = match schema_for {
861                    ExplainSinkSchemaFor::Key => key_format
862                        .and_then(|f| match f {
863                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
864                            _ => None,
865                        })
866                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
867                    ExplainSinkSchemaFor::Value => value_schema,
868                };
869
870                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
871                    sink_from: sink.from,
872                    json_schema: schema,
873                }))
874            }
875            _ => bail_unsupported!(
876                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
877            ),
878        },
879        _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
880    }
881}
882
883pub fn plan_explain_pushdown(
884    scx: &StatementContext,
885    statement: ExplainPushdownStatement<Aug>,
886    params: &Params,
887) -> Result<Plan, PlanError> {
888    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
889    let explainee = plan_explainee(scx, statement.explainee, params)?;
890    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
891}
892
893pub fn plan_explain_analyze_object(
894    scx: &StatementContext,
895    statement: ExplainAnalyzeObjectStatement<Aug>,
896    params: &Params,
897) -> Result<Plan, PlanError> {
898    let explainee_name = statement
899        .explainee
900        .name()
901        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
902        .full_name_str();
903    let explainee = plan_explainee(scx, statement.explainee, params)?;
904
905    match explainee {
906        plan::Explainee::Index(_index_id) => (),
907        plan::Explainee::MaterializedView(_item_id) => (),
908        _ => {
909            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
910        }
911    };
912
913    // generate SQL query
914
915    /* WITH {CTEs}
916       SELECT REPEAT(' ', nesting * 2) || operator AS operator
917             {columns}
918        FROM      mz_introspection.mz_lir_mapping mlm
919             JOIN {from} USING (lir_id)
920             JOIN mz_introspection.mz_mappable_objects mo
921               ON (mlm.global_id = mo.global_id)
922       WHERE     mo.name = {escaped explainee_name}
923             AND {predicates}
924       ORDER BY lir_id DESC
925    */
926    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
927    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
928    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
929    let mut predicates = vec![format!(
930        "mo.name = {}",
931        escaped_string_literal(&explainee_name)
932    )];
933    let mut order_by = vec!["mlm.lir_id DESC"];
934
935    match statement.properties {
936        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
937            properties,
938            skew,
939        }) => {
940            let mut worker_id = None;
941            let mut seen_properties = BTreeSet::new();
942            for property in properties {
943                // handle each property only once (belt and suspenders)
944                if !seen_properties.insert(property) {
945                    continue;
946                }
947
948                match property {
949                    ExplainAnalyzeComputationProperty::Memory => {
950                        ctes.push((
951                            "summary_memory",
952                            r#"
953  SELECT mlm.global_id AS global_id,
954         mlm.lir_id AS lir_id,
955         SUM(mas.size) AS total_memory,
956         SUM(mas.records) AS total_records,
957         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
958         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
959    FROM            mz_introspection.mz_lir_mapping mlm
960         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
961               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
962                 ON (mas.operator_id = valid_id)
963GROUP BY mlm.global_id, mlm.lir_id"#,
964                        ));
965                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
966
967                        if skew {
968                            ctes.push((
969                                "per_worker_memory",
970                                r#"
971  SELECT mlm.global_id AS global_id,
972         mlm.lir_id AS lir_id,
973         mas.worker_id AS worker_id,
974         SUM(mas.size) AS worker_memory,
975         SUM(mas.records) AS worker_records
976    FROM            mz_introspection.mz_lir_mapping mlm
977         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
978               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
979                 ON (mas.operator_id = valid_id)
980GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
981                            ));
982                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
983
984                            if let Some(worker_id) = worker_id {
985                                predicates.push(format!("pwm.worker_id = {worker_id}"));
986                            } else {
987                                worker_id = Some("pwm.worker_id");
988                                columns.push("pwm.worker_id AS worker_id");
989                                order_by.push("worker_id");
990                            }
991
992                            columns.extend([
993                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
994                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
995                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
996                                "pg_size_pretty(sm.total_memory) AS total_memory",
997                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
998                                "pwm.worker_records AS worker_records",
999                                "sm.avg_records AS avg_records",
1000                                "sm.total_records AS total_records",
1001                            ]);
1002                        } else {
1003                            columns.extend([
1004                                "pg_size_pretty(sm.total_memory) AS total_memory",
1005                                "sm.total_records AS total_records",
1006                            ]);
1007                        }
1008                    }
1009                    ExplainAnalyzeComputationProperty::Cpu => {
1010                        ctes.push((
1011                            "summary_cpu",
1012                            r#"
1013  SELECT mlm.global_id AS global_id,
1014         mlm.lir_id AS lir_id,
1015         SUM(mse.elapsed_ns) AS total_ns,
1016         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1017    FROM            mz_introspection.mz_lir_mapping mlm
1018         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1019               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1020                 ON (mse.id = valid_id)
1021GROUP BY mlm.global_id, mlm.lir_id"#,
1022                        ));
1023                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1024
1025                        if skew {
1026                            ctes.push((
1027                                "per_worker_cpu",
1028                                r#"
1029  SELECT mlm.global_id AS global_id,
1030         mlm.lir_id AS lir_id,
1031         mse.worker_id AS worker_id,
1032         SUM(mse.elapsed_ns) AS worker_ns
1033    FROM            mz_introspection.mz_lir_mapping mlm
1034         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1035               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1036                 ON (mse.id = valid_id)
1037GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1038                            ));
1039                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1040
1041                            if let Some(worker_id) = worker_id {
1042                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1043                            } else {
1044                                worker_id = Some("pwc.worker_id");
1045                                columns.push("pwc.worker_id AS worker_id");
1046                                order_by.push("worker_id");
1047                            }
1048
1049                            columns.extend([
1050                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1051                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1052                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1053                            ]);
1054                        }
1055                        columns.push(
1056                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1057                        );
1058                    }
1059                }
1060            }
1061        }
1062        ExplainAnalyzeProperty::Hints => {
1063            columns.extend([
1064                "megsa.levels AS levels",
1065                "megsa.to_cut AS to_cut",
1066                "megsa.hint AS hint",
1067                "pg_size_pretty(megsa.savings) AS savings",
1068            ]);
1069            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1070            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1071             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1072        }
1073    }
1074
1075    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1076
1077    let ctes = if !ctes.is_empty() {
1078        format!(
1079            "WITH {}",
1080            separated(
1081                ",\n",
1082                ctes.iter()
1083                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1084            )
1085        )
1086    } else {
1087        String::new()
1088    };
1089    let columns = separated(", ", columns);
1090    let from = separated(" ", from);
1091    let predicates = separated(" AND ", predicates);
1092    let order_by = separated(", ", order_by);
1093    let query = format!(
1094        r#"{ctes}
1095SELECT {columns}
1096FROM {from}
1097WHERE {predicates}
1098ORDER BY {order_by}"#
1099    );
1100
1101    if statement.as_sql {
1102        let rows = vec![Row::pack_slice(&[Datum::String(
1103            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1104                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1105            })?,
1106        )])];
1107        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1108
1109        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1110    } else {
1111        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1112        show_select.plan()
1113    }
1114}
1115
1116pub fn plan_explain_analyze_cluster(
1117    scx: &StatementContext,
1118    statement: ExplainAnalyzeClusterStatement,
1119    _params: &Params,
1120) -> Result<Plan, PlanError> {
1121    // object string
1122    // worker_id uint64        (if           skew)
1123    // memory_ratio numeric    (if memory && skew)
1124    // worker_memory string    (if memory && skew)
1125    // avg_memory string       (if memory && skew)
1126    // total_memory string     (if memory)
1127    // records_ratio numeric   (if memory && skew)
1128    // worker_records          (if memory && skew)
1129    // avg_records numeric     (if memory && skew)
1130    // total_records numeric   (if memory)
1131    // cpu_ratio numeric       (if cpu    && skew)
1132    // worker_elapsed interval (if cpu    && skew)
1133    // avg_elapsed interval    (if cpu    && skew)
1134    // total_elapsed interval  (if cpu)
1135
1136    /* WITH {CTEs}
1137       SELECT mo.name AS object
1138             {columns}
1139        FROM mz_introspection.mz_mappable_objects mo
1140             {from}
1141       WHERE {predicates}
1142       ORDER BY {order_by}, mo.name DESC
1143    */
1144    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1145    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1146    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1147    let mut predicates = vec![];
1148    let mut order_by = vec![];
1149
1150    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1151    let mut worker_id = None;
1152    let mut seen_properties = BTreeSet::new();
1153    for property in properties {
1154        // handle each property only once (belt and suspenders)
1155        if !seen_properties.insert(property) {
1156            continue;
1157        }
1158
1159        match property {
1160            ExplainAnalyzeComputationProperty::Memory => {
1161                if skew {
1162                    let mut set_worker_id = false;
1163                    if let Some(worker_id) = worker_id {
1164                        // join condition if we're showing skew for more than one property
1165                        predicates.push(format!("om.worker_id = {worker_id}"));
1166                    } else {
1167                        worker_id = Some("om.worker_id");
1168                        columns.push("om.worker_id AS worker_id");
1169                        set_worker_id = true; // we'll add ourselves to `order_by` later
1170                    };
1171
1172                    // computes the average memory per LIR operator (for per operator ratios)
1173                    ctes.push((
1174                    "per_operator_memory_summary",
1175                    r#"
1176SELECT mlm.global_id AS global_id,
1177       mlm.lir_id AS lir_id,
1178       SUM(mas.size) AS total_memory,
1179       SUM(mas.records) AS total_records,
1180       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1181       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1182FROM        mz_introspection.mz_lir_mapping mlm
1183 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1184       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1185         ON (mas.operator_id = valid_id)
1186GROUP BY mlm.global_id, mlm.lir_id"#,
1187                ));
1188
1189                    // computes the memory per worker in a per operator way
1190                    ctes.push((
1191                    "per_operator_memory_per_worker",
1192                    r#"
1193SELECT mlm.global_id AS global_id,
1194       mlm.lir_id AS lir_id,
1195       mas.worker_id AS worker_id,
1196       SUM(mas.size) AS worker_memory,
1197       SUM(mas.records) AS worker_records
1198FROM        mz_introspection.mz_lir_mapping mlm
1199 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1200       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1201         ON (mas.operator_id = valid_id)
1202GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1203                    ));
1204
1205                    // computes memory ratios per worker per operator
1206                    ctes.push((
1207                    "per_operator_memory_ratios",
1208                    r#"
1209SELECT pompw.global_id AS global_id,
1210       pompw.lir_id AS lir_id,
1211       pompw.worker_id AS worker_id,
1212       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1213       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1214  FROM      per_operator_memory_per_worker pompw
1215       JOIN per_operator_memory_summary poms
1216         USING (global_id, lir_id)
1217"#,
1218                    ));
1219
1220                    // summarizes each object, per worker
1221                    ctes.push((
1222                        "object_memory",
1223                        r#"
1224SELECT pompw.global_id AS global_id,
1225       pompw.worker_id AS worker_id,
1226       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1227       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1228       SUM(pompw.worker_memory) AS worker_memory,
1229       SUM(pompw.worker_records) AS worker_records
1230FROM        per_operator_memory_per_worker pompw
1231     JOIN   per_operator_memory_ratios pomr
1232     USING (global_id, worker_id, lir_id)
1233GROUP BY pompw.global_id, pompw.worker_id
1234"#,
1235                    ));
1236
1237                    // summarizes each worker
1238                    ctes.push(("object_average_memory", r#"
1239SELECT om.global_id AS global_id,
1240       SUM(om.worker_memory) AS total_memory,
1241       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1242       SUM(om.worker_records) AS total_records,
1243       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1244  FROM object_memory om
1245GROUP BY om.global_id"#));
1246
1247                    from.push("LEFT JOIN object_memory om USING (global_id)");
1248                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1249
1250                    columns.extend([
1251                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1252                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1253                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1254                        "pg_size_pretty(oam.total_memory) AS total_memory",
1255                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1256                        "om.worker_records AS worker_records",
1257                        "oam.avg_records AS avg_records",
1258                        "oam.total_records AS total_records",
1259                    ]);
1260
1261                    order_by.extend([
1262                        "max_operator_memory_ratio DESC",
1263                        "max_operator_records_ratio DESC",
1264                        "om.worker_memory DESC",
1265                        "worker_records DESC",
1266                    ]);
1267
1268                    if set_worker_id {
1269                        order_by.push("worker_id");
1270                    }
1271                } else {
1272                    // no skew, so just compute totals
1273                    ctes.push((
1274                        "per_operator_memory_totals",
1275                        r#"
1276    SELECT mlm.global_id AS global_id,
1277           mlm.lir_id AS lir_id,
1278           SUM(mas.size) AS total_memory,
1279           SUM(mas.records) AS total_records
1280    FROM        mz_introspection.mz_lir_mapping mlm
1281     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1282           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1283             ON (mas.operator_id = valid_id)
1284    GROUP BY mlm.global_id, mlm.lir_id"#,
1285                    ));
1286
1287                    ctes.push((
1288                        "object_memory_totals",
1289                        r#"
1290SELECT pomt.global_id AS global_id,
1291       SUM(pomt.total_memory) AS total_memory,
1292       SUM(pomt.total_records) AS total_records
1293FROM per_operator_memory_totals pomt
1294GROUP BY pomt.global_id
1295"#,
1296                    ));
1297
1298                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1299                    columns.extend([
1300                        "pg_size_pretty(omt.total_memory) AS total_memory",
1301                        "omt.total_records AS total_records",
1302                    ]);
1303                    order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1304                }
1305            }
1306            ExplainAnalyzeComputationProperty::Cpu => {
1307                if skew {
1308                    let mut set_worker_id = false;
1309                    if let Some(worker_id) = worker_id {
1310                        // join condition if we're showing skew for more than one property
1311                        predicates.push(format!("oc.worker_id = {worker_id}"));
1312                    } else {
1313                        worker_id = Some("oc.worker_id");
1314                        columns.push("oc.worker_id AS worker_id");
1315                        set_worker_id = true; // we'll add ourselves to `order_by` later
1316                    };
1317
1318                    // computes the average memory per LIR operator (for per operator ratios)
1319                    ctes.push((
1320    "per_operator_cpu_summary",
1321    r#"
1322SELECT mlm.global_id AS global_id,
1323       mlm.lir_id AS lir_id,
1324       SUM(mse.elapsed_ns) AS total_ns,
1325       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1326FROM       mz_introspection.mz_lir_mapping mlm
1327CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1328      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1329        ON (mse.id = valid_id)
1330GROUP BY mlm.global_id, mlm.lir_id"#,
1331));
1332
1333                    // computes the CPU per worker in a per operator way
1334                    ctes.push((
1335                        "per_operator_cpu_per_worker",
1336                        r#"
1337SELECT mlm.global_id AS global_id,
1338       mlm.lir_id AS lir_id,
1339       mse.worker_id AS worker_id,
1340       SUM(mse.elapsed_ns) AS worker_ns
1341FROM       mz_introspection.mz_lir_mapping mlm
1342CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1343      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1344        ON (mse.id = valid_id)
1345GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1346                    ));
1347
1348                    // computes CPU ratios per worker per operator
1349                    ctes.push((
1350                        "per_operator_cpu_ratios",
1351                        r#"
1352SELECT pocpw.global_id AS global_id,
1353       pocpw.lir_id AS lir_id,
1354       pocpw.worker_id AS worker_id,
1355       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1356FROM      per_operator_cpu_per_worker pocpw
1357     JOIN per_operator_cpu_summary pocs
1358     USING (global_id, lir_id)
1359"#,
1360                    ));
1361
1362                    // summarizes each object, per worker
1363                    ctes.push((
1364                        "object_cpu",
1365                        r#"
1366SELECT pocpw.global_id AS global_id,
1367       pocpw.worker_id AS worker_id,
1368       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1369       SUM(pocpw.worker_ns) AS worker_ns
1370FROM      per_operator_cpu_per_worker pocpw
1371     JOIN per_operator_cpu_ratios pomr
1372     USING (global_id, worker_id, lir_id)
1373GROUP BY pocpw.global_id, pocpw.worker_id
1374"#,
1375                    ));
1376
1377                    // summarizes each worker
1378                    ctes.push((
1379                        "object_average_cpu",
1380                        r#"
1381SELECT oc.global_id AS global_id,
1382       SUM(oc.worker_ns) AS total_ns,
1383       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1384  FROM object_cpu oc
1385GROUP BY oc.global_id"#,));
1386
1387                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1388                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1389
1390                    columns.extend([
1391                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1392                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1393                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1394                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1395                    ]);
1396
1397                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1398
1399                    if set_worker_id {
1400                        order_by.push("worker_id");
1401                    }
1402                } else {
1403                    // no skew, so just compute totals
1404                    ctes.push((
1405                        "per_operator_cpu_totals",
1406                        r#"
1407    SELECT mlm.global_id AS global_id,
1408           mlm.lir_id AS lir_id,
1409           SUM(mse.elapsed_ns) AS total_ns
1410    FROM        mz_introspection.mz_lir_mapping mlm
1411     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1412           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1413             ON (mse.id = valid_id)
1414    GROUP BY mlm.global_id, mlm.lir_id"#,
1415                    ));
1416
1417                    ctes.push((
1418                        "object_cpu_totals",
1419                        r#"
1420SELECT poct.global_id AS global_id,
1421       SUM(poct.total_ns) AS total_ns
1422FROM per_operator_cpu_totals poct
1423GROUP BY poct.global_id
1424"#,
1425                    ));
1426
1427                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1428                    columns
1429                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1430                    order_by.extend(["total_elapsed DESC"]);
1431                }
1432            }
1433        }
1434    }
1435
1436    // generate SQL query text
1437    let ctes = if !ctes.is_empty() {
1438        format!(
1439            "WITH {}",
1440            separated(
1441                ",\n",
1442                ctes.iter()
1443                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1444            )
1445        )
1446    } else {
1447        String::new()
1448    };
1449    let columns = separated(", ", columns);
1450    let from = separated(" ", from);
1451    let predicates = if !predicates.is_empty() {
1452        format!("WHERE {}", separated(" AND ", predicates))
1453    } else {
1454        String::new()
1455    };
1456    // add mo.name last, to break ties only
1457    order_by.push("mo.name DESC");
1458    let order_by = separated(", ", order_by);
1459    let query = format!(
1460        r#"{ctes}
1461SELECT {columns}
1462FROM {from}
1463{predicates}
1464ORDER BY {order_by}"#
1465    );
1466
1467    if statement.as_sql {
1468        let rows = vec![Row::pack_slice(&[Datum::String(
1469            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1470                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1471            })?,
1472        )])];
1473        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1474
1475        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1476    } else {
1477        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1478        show_select.plan()
1479    }
1480}
1481
1482pub fn plan_explain_timestamp(
1483    scx: &StatementContext,
1484    explain: ExplainTimestampStatement<Aug>,
1485) -> Result<Plan, PlanError> {
1486    let (format, _verbose_syntax) = match explain.format() {
1487        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1488        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1489        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1490        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1491    };
1492
1493    let raw_plan = {
1494        let query::PlannedRootQuery {
1495            expr: raw_plan,
1496            desc: _,
1497            finishing: _,
1498            scope: _,
1499        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1500        if raw_plan.contains_parameters()? {
1501            return Err(PlanError::ParameterNotAllowed(
1502                "EXPLAIN TIMESTAMP".to_string(),
1503            ));
1504        }
1505
1506        raw_plan
1507    };
1508    let when = query::plan_as_of(scx, explain.select.as_of)?;
1509
1510    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1511        format,
1512        raw_plan,
1513        when,
1514    }))
1515}
1516
1517generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1518
1519pub fn describe_subscribe(
1520    scx: &StatementContext,
1521    stmt: SubscribeStatement<Aug>,
1522) -> Result<StatementDesc, PlanError> {
1523    let relation_desc = match stmt.relation {
1524        SubscribeRelation::Name(name) => {
1525            let item = scx.get_item_by_resolved_name(&name)?;
1526            match item.relation_desc() {
1527                Some(desc) => desc.into_owned(),
1528                None => sql_bail!(
1529                    "'{}' cannot be subscribed to because it is a {}",
1530                    name.full_name_str(),
1531                    item.item_type(),
1532                ),
1533            }
1534        }
1535        SubscribeRelation::Query(query) => {
1536            let query::PlannedRootQuery { desc, .. } =
1537                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1538            desc
1539        }
1540    };
1541    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1542    let progress = progress.unwrap_or(false);
1543    let mut desc = RelationDesc::builder().with_column(
1544        "mz_timestamp",
1545        SqlScalarType::Numeric {
1546            max_scale: Some(NumericMaxScale::ZERO),
1547        }
1548        .nullable(false),
1549    );
1550    if progress {
1551        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1552    }
1553
1554    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1555    match stmt.output {
1556        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1557            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1558            for (name, mut ty) in relation_desc.into_iter() {
1559                if progress {
1560                    ty.nullable = true;
1561                }
1562                desc = desc.with_column(name, ty);
1563            }
1564        }
1565        SubscribeOutput::EnvelopeUpsert { key_columns }
1566        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1567            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1568            let key_columns = key_columns
1569                .into_iter()
1570                .map(normalize::column_name)
1571                .collect_vec();
1572            let mut before_values_desc = RelationDesc::builder();
1573            let mut after_values_desc = RelationDesc::builder();
1574
1575            // Add the key columns in the order that they're specified.
1576            for column_name in &key_columns {
1577                let mut column_ty = relation_desc
1578                    .get_by_name(column_name)
1579                    .map(|(_pos, ty)| ty.clone())
1580                    .ok_or_else(|| PlanError::UnknownColumn {
1581                        table: None,
1582                        column: column_name.clone(),
1583                        similar: Box::new([]),
1584                    })?;
1585                if progress {
1586                    column_ty.nullable = true;
1587                }
1588                desc = desc.with_column(column_name, column_ty);
1589            }
1590
1591            // Then add the remaining columns in the order from the original
1592            // table, filtering out the key columns since we added those above.
1593            for (mut name, mut ty) in relation_desc
1594                .into_iter()
1595                .filter(|(name, _ty)| !key_columns.contains(name))
1596            {
1597                ty.nullable = true;
1598                before_values_desc =
1599                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1600                if debezium {
1601                    name = format!("after_{}", name).into();
1602                }
1603                after_values_desc = after_values_desc.with_column(name, ty);
1604            }
1605
1606            if debezium {
1607                desc = desc.concat(before_values_desc);
1608            }
1609            desc = desc.concat(after_values_desc);
1610        }
1611    }
1612    Ok(StatementDesc::new(Some(desc.finish())))
1613}
1614
1615pub fn plan_subscribe(
1616    scx: &StatementContext,
1617    SubscribeStatement {
1618        relation,
1619        options,
1620        as_of,
1621        up_to,
1622        output,
1623    }: SubscribeStatement<Aug>,
1624    params: &Params,
1625    copy_to: Option<CopyFormat>,
1626) -> Result<Plan, PlanError> {
1627    let (from, desc, scope) = match relation {
1628        SubscribeRelation::Name(name) => {
1629            let item = scx.get_item_by_resolved_name(&name)?;
1630            let Some(desc) = item.relation_desc() else {
1631                sql_bail!(
1632                    "'{}' cannot be subscribed to because it is a {}",
1633                    name.full_name_str(),
1634                    item.item_type(),
1635                );
1636            };
1637            let item_name = match name {
1638                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1639                _ => None,
1640            };
1641            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1642            (
1643                SubscribeFrom::Id(item.global_id()),
1644                desc.into_owned(),
1645                scope,
1646            )
1647        }
1648        SubscribeRelation::Query(query) => {
1649            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1650            let query::PlannedRootQuery {
1651                mut expr,
1652                desc,
1653                finishing,
1654                scope,
1655            } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1656            expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?;
1657            let query = query::PlannedRootQuery {
1658                expr,
1659                desc,
1660                finishing,
1661                scope,
1662            };
1663            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1664            // finishing should have already been turned into a `TopK` by
1665            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1666            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1667                &query.finishing,
1668                query.desc.arity()
1669            ));
1670            let desc = query.desc.clone();
1671            (
1672                SubscribeFrom::Query {
1673                    expr: query.expr,
1674                    desc: query.desc,
1675                },
1676                desc,
1677                query.scope,
1678            )
1679        }
1680    };
1681
1682    let when = query::plan_as_of(scx, as_of)?;
1683    let up_to = up_to
1684        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1685        .transpose()?;
1686
1687    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1688    let ecx = ExprContext {
1689        qcx: &qcx,
1690        name: "",
1691        scope: &scope,
1692        relation_type: desc.typ(),
1693        allow_aggregates: false,
1694        allow_subqueries: true,
1695        allow_parameters: true,
1696        allow_windows: false,
1697    };
1698
1699    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1700    let output = match output {
1701        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1702        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1703            let order_by = key_columns
1704                .iter()
1705                .map(|ident| OrderByExpr {
1706                    expr: Expr::Identifier(vec![ident.clone()]),
1707                    asc: None,
1708                    nulls_last: None,
1709                })
1710                .collect_vec();
1711            let (order_by, map_exprs) = query::plan_order_by_exprs(
1712                &ExprContext {
1713                    name: "ENVELOPE UPSERT KEY clause",
1714                    ..ecx
1715                },
1716                &order_by[..],
1717                &output_columns[..],
1718            )?;
1719            if !map_exprs.is_empty() {
1720                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1721            }
1722            plan::SubscribeOutput::EnvelopeUpsert {
1723                order_by_keys: order_by,
1724            }
1725        }
1726        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1727            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1728            let order_by = key_columns
1729                .iter()
1730                .map(|ident| OrderByExpr {
1731                    expr: Expr::Identifier(vec![ident.clone()]),
1732                    asc: None,
1733                    nulls_last: None,
1734                })
1735                .collect_vec();
1736            let (order_by, map_exprs) = query::plan_order_by_exprs(
1737                &ExprContext {
1738                    name: "ENVELOPE DEBEZIUM KEY clause",
1739                    ..ecx
1740                },
1741                &order_by[..],
1742                &output_columns[..],
1743            )?;
1744            if !map_exprs.is_empty() {
1745                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1746            }
1747            plan::SubscribeOutput::EnvelopeDebezium {
1748                order_by_keys: order_by,
1749            }
1750        }
1751        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1752            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1753            let mz_diff = "mz_diff".into();
1754            let output_columns = std::iter::once((0, &mz_diff))
1755                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1756                .collect_vec();
1757            match query::plan_order_by_exprs(
1758                &ExprContext {
1759                    name: "WITHIN TIMESTAMP ORDER BY clause",
1760                    ..ecx
1761                },
1762                &order_by[..],
1763                &output_columns[..],
1764            ) {
1765                Err(PlanError::UnknownColumn {
1766                    table: None,
1767                    column,
1768                    similar: _,
1769                }) if &column == &mz_diff => {
1770                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1771                    // it looks like an unknown column. Instead, return a better error
1772                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1773                }
1774                Err(e) => return Err(e),
1775                Ok((order_by, map_exprs)) => {
1776                    if !map_exprs.is_empty() {
1777                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1778                    }
1779
1780                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1781                }
1782            }
1783        }
1784    };
1785
1786    let SubscribeOptionExtracted {
1787        progress, snapshot, ..
1788    } = options.try_into()?;
1789    Ok(Plan::Subscribe(SubscribePlan {
1790        from,
1791        when,
1792        up_to,
1793        with_snapshot: snapshot.unwrap_or(true),
1794        copy_to,
1795        emit_progress: progress.unwrap_or(false),
1796        output,
1797    }))
1798}
1799
1800pub fn describe_copy_from_table(
1801    scx: &StatementContext,
1802    table_name: <Aug as AstInfo>::ItemName,
1803    columns: Vec<Ident>,
1804) -> Result<StatementDesc, PlanError> {
1805    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1806    Ok(StatementDesc::new(Some(desc)))
1807}
1808
1809pub fn describe_copy_item(
1810    scx: &StatementContext,
1811    object_name: <Aug as AstInfo>::ItemName,
1812    columns: Vec<Ident>,
1813) -> Result<StatementDesc, PlanError> {
1814    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1815    Ok(StatementDesc::new(Some(desc)))
1816}
1817
1818pub fn describe_copy(
1819    scx: &StatementContext,
1820    CopyStatement {
1821        relation,
1822        direction,
1823        ..
1824    }: CopyStatement<Aug>,
1825) -> Result<StatementDesc, PlanError> {
1826    Ok(match (relation, direction) {
1827        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1828            describe_copy_item(scx, name, columns)?
1829        }
1830        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1831            describe_copy_from_table(scx, name, columns)?
1832        }
1833        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1834        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1835    }
1836    .with_is_copy())
1837}
1838
1839fn plan_copy_to_expr(
1840    scx: &StatementContext,
1841    select_plan: SelectPlan,
1842    desc: RelationDesc,
1843    to: &Expr<Aug>,
1844    format: CopyFormat,
1845    options: CopyOptionExtracted,
1846) -> Result<Plan, PlanError> {
1847    let conn_id = match options.aws_connection {
1848        Some(conn_id) => CatalogItemId::from(conn_id),
1849        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1850    };
1851    let connection = scx.get_item(&conn_id).connection()?;
1852
1853    match connection {
1854        mz_storage_types::connections::Connection::Aws(_) => {}
1855        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1856    }
1857
1858    let format = match format {
1859        CopyFormat::Csv => {
1860            let quote = extract_byte_param_value(options.quote, "quote")?;
1861            let escape = extract_byte_param_value(options.escape, "escape")?;
1862            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1863            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1864                CopyCsvFormatParams::try_new(
1865                    delimiter,
1866                    quote,
1867                    escape,
1868                    options.header,
1869                    options.null,
1870                )
1871                .map_err(|e| sql_err!("{}", e))?,
1872            ))
1873        }
1874        CopyFormat::Parquet => {
1875            // Validate that the output desc can be formatted as parquet.
1876            // COPY TO does not apply any type overrides, so pass `|_| None`.
1877            ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1878                .map_err(|e| sql_err!("{}", e))?;
1879            S3SinkFormat::Parquet
1880        }
1881        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1882        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1883    };
1884
1885    // Converting the to expr to a HirScalarExpr
1886    let mut to_expr = to.clone();
1887    transform_ast::transform(scx, &mut to_expr)?;
1888    let relation_type = RelationDesc::empty();
1889    let ecx = &ExprContext {
1890        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1891        name: "COPY TO target",
1892        scope: &Scope::empty(),
1893        relation_type: relation_type.typ(),
1894        allow_aggregates: false,
1895        allow_subqueries: false,
1896        allow_parameters: false,
1897        allow_windows: false,
1898    };
1899
1900    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1901
1902    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1903        sql_bail!(
1904            "MAX FILE SIZE cannot be less than {}",
1905            MIN_S3_SINK_FILE_SIZE
1906        );
1907    }
1908    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1909        sql_bail!(
1910            "MAX FILE SIZE cannot be greater than {}",
1911            MAX_S3_SINK_FILE_SIZE
1912        );
1913    }
1914
1915    Ok(Plan::CopyTo(CopyToPlan {
1916        select_plan,
1917        desc,
1918        to,
1919        connection: connection.to_owned(),
1920        connection_id: conn_id,
1921        format,
1922        max_file_size: options.max_file_size.as_bytes(),
1923    }))
1924}
1925
1926fn plan_copy_from(
1927    scx: &StatementContext,
1928    target: &CopyTarget<Aug>,
1929    table_name: ResolvedItemName,
1930    columns: Vec<Ident>,
1931    format: CopyFormat,
1932    options: CopyOptionExtracted,
1933) -> Result<Plan, PlanError> {
1934    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1935        match option {
1936            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1937            None => Ok(()),
1938        }
1939    }
1940
1941    let source = match target {
1942        CopyTarget::Stdin => CopyFromSource::Stdin,
1943        CopyTarget::Expr(from) => {
1944            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1945
1946            // Converting the expr to an HirScalarExpr
1947            let mut from_expr = from.clone();
1948            transform_ast::transform(scx, &mut from_expr)?;
1949            let relation_type = RelationDesc::empty();
1950            let ecx = &ExprContext {
1951                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1952                name: "COPY FROM target",
1953                scope: &Scope::empty(),
1954                relation_type: relation_type.typ(),
1955                allow_aggregates: false,
1956                allow_subqueries: false,
1957                allow_parameters: false,
1958                allow_windows: false,
1959            };
1960            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1961
1962            match options.aws_connection {
1963                Some(conn_id) => {
1964                    let conn_id = CatalogItemId::from(conn_id);
1965
1966                    // Validate the connection type is one we expect.
1967                    let connection = match scx.get_item(&conn_id).connection()? {
1968                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1969                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1970                    };
1971
1972                    CopyFromSource::AwsS3 {
1973                        uri: from,
1974                        connection,
1975                        connection_id: conn_id,
1976                    }
1977                }
1978                None => CopyFromSource::Url(from),
1979            }
1980        }
1981        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1982    };
1983
1984    let params = match format {
1985        CopyFormat::Text => {
1986            only_available_with_csv(options.quote, "quote")?;
1987            only_available_with_csv(options.escape, "escape")?;
1988            only_available_with_csv(options.header, "HEADER")?;
1989            let delimiter =
1990                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1991            let null = match options.null {
1992                Some(null) => Cow::from(null),
1993                None => Cow::from("\\N"),
1994            };
1995            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1996        }
1997        CopyFormat::Csv => {
1998            let quote = extract_byte_param_value(options.quote, "quote")?;
1999            let escape = extract_byte_param_value(options.escape, "escape")?;
2000            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2001            CopyFormatParams::Csv(
2002                CopyCsvFormatParams::try_new(
2003                    delimiter,
2004                    quote,
2005                    escape,
2006                    options.header,
2007                    options.null,
2008                )
2009                .map_err(|e| sql_err!("{}", e))?,
2010            )
2011        }
2012        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2013        CopyFormat::Parquet => CopyFormatParams::Parquet,
2014    };
2015
2016    let filter = match (options.files, options.pattern) {
2017        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2018        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2019        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2020        (None, None) => None,
2021    };
2022
2023    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2024        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2025    }
2026
2027    let table_name_string = table_name.full_name_str();
2028
2029    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2030
2031    let Some(mfp) = maybe_mfp else {
2032        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2033    };
2034
2035    Ok(Plan::CopyFrom(CopyFromPlan {
2036        target_id: id,
2037        target_name: table_name_string,
2038        source,
2039        columns,
2040        source_desc,
2041        mfp,
2042        params,
2043        filter,
2044    }))
2045}
2046
2047fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2048    match v {
2049        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2050        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2051        None => Ok(None),
2052    }
2053}
2054
2055generate_extracted_config!(
2056    CopyOption,
2057    (Format, String),
2058    (Delimiter, String),
2059    (Null, String),
2060    (Escape, String),
2061    (Quote, String),
2062    (Header, bool),
2063    (AwsConnection, with_options::Object),
2064    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2065    (Files, Vec<String>),
2066    (Pattern, String)
2067);
2068
2069pub fn plan_copy(
2070    scx: &StatementContext,
2071    CopyStatement {
2072        relation,
2073        direction,
2074        target,
2075        options,
2076    }: CopyStatement<Aug>,
2077) -> Result<Plan, PlanError> {
2078    let options = CopyOptionExtracted::try_from(options)?;
2079    // Parse any user-provided FORMAT option. If not provided, will default to
2080    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2081    let format = options
2082        .format
2083        .as_ref()
2084        .map(|format| match format.to_lowercase().as_str() {
2085            "text" => Ok(CopyFormat::Text),
2086            "csv" => Ok(CopyFormat::Csv),
2087            "binary" => Ok(CopyFormat::Binary),
2088            "parquet" => Ok(CopyFormat::Parquet),
2089            _ => sql_bail!("unknown FORMAT: {}", format),
2090        })
2091        .transpose()?;
2092
2093    match (&direction, &target) {
2094        (CopyDirection::To, CopyTarget::Stdout) => {
2095            if options.delimiter.is_some() {
2096                sql_bail!("COPY TO does not support DELIMITER option yet");
2097            }
2098            if options.quote.is_some() {
2099                sql_bail!("COPY TO does not support QUOTE option yet");
2100            }
2101            if options.null.is_some() {
2102                sql_bail!("COPY TO does not support NULL option yet");
2103            }
2104            match relation {
2105                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2106                CopyRelation::Select(stmt) => Ok(plan_select(
2107                    scx,
2108                    stmt,
2109                    &Params::empty(),
2110                    Some(format.unwrap_or(CopyFormat::Text)),
2111                )?),
2112                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2113                    scx,
2114                    stmt,
2115                    &Params::empty(),
2116                    Some(format.unwrap_or(CopyFormat::Text)),
2117                )?),
2118            }
2119        }
2120        (CopyDirection::From, target) => match relation {
2121            CopyRelation::Named { name, columns } => plan_copy_from(
2122                scx,
2123                target,
2124                name,
2125                columns,
2126                format.unwrap_or(CopyFormat::Text),
2127                options,
2128            ),
2129            _ => sql_bail!("COPY FROM {} not supported", target),
2130        },
2131        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2132            // System users are always allowed to use this feature, even when
2133            // the flag is disabled, so that we can dogfood for analytics in
2134            // production environments. The feature is stable enough that we're
2135            // not worried about it crashing.
2136            if !scx.catalog.active_role_id().is_system() {
2137                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2138            }
2139
2140            let format = match format {
2141                Some(inner) => inner,
2142                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2143            };
2144
2145            let stmt = match relation {
2146                CopyRelation::Named { name, columns } => {
2147                    if !columns.is_empty() {
2148                        // TODO(mouli): Add support for this
2149                        sql_bail!(
2150                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2151                        );
2152                    }
2153                    // Generate a synthetic SELECT query that just gets the table
2154                    let query = Query {
2155                        ctes: CteBlock::empty(),
2156                        body: SetExpr::Table(name),
2157                        order_by: vec![],
2158                        limit: None,
2159                        offset: None,
2160                    };
2161                    SelectStatement { query, as_of: None }
2162                }
2163                CopyRelation::Select(stmt) => {
2164                    if !stmt.query.order_by.is_empty() {
2165                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2166                    }
2167                    stmt
2168                }
2169                CopyRelation::Subscribe(_) => {
2170                    sql_bail!("COPY {} {} not supported", direction, target)
2171                }
2172            };
2173
2174            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2175            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2176        }
2177        _ => sql_bail!("COPY {} {} not supported", direction, target),
2178    }
2179}