Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_ore::num::NonNeg;
22use mz_ore::soft_panic_or_log;
23use mz_ore::str::separated;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
30use mz_sql_parser::ast::{
31    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
32    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
33    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
34    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
35    SetExpr, SubscribeOutput, UnresolvedItemName,
36};
37use mz_sql_parser::ident;
38use mz_storage_types::sinks::{
39    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
40    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
41};
42
43use crate::ast::display::{AstDisplay, escaped_string_literal};
44use crate::ast::{
45    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
46    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
47    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
48    UpdateStatement,
49};
50use crate::catalog::CatalogItemType;
51use crate::names::{Aug, ResolvedItemName};
52use crate::normalize;
53use crate::plan::query::{
54    ExprContext, QueryLifetime, negative_offset_error, offset_into_value, plan_as_of_or_up_to,
55    plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62    ExplainTimestampPlan, HirRelationExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF};
70
71// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
72// plans the whole query to determine its shape and parameter types,
73// and then throws away that plan. If we were smarter, we'd stash that
74// plan somewhere so we don't have to recompute it when the query is
75// executed.
76
77pub fn describe_insert(
78    scx: &StatementContext,
79    InsertStatement {
80        table_name,
81        columns,
82        source,
83        returning,
84    }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87    let desc = if returning.expr.is_empty() {
88        None
89    } else {
90        Some(returning.desc)
91    };
92    Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96    scx: &StatementContext,
97    InsertStatement {
98        table_name,
99        columns,
100        source,
101        returning,
102    }: InsertStatement<Aug>,
103    params: &Params,
104) -> Result<Plan, PlanError> {
105    let (id, mut expr, returning) =
106        query::plan_insert_query(scx, table_name, columns, source, returning)?;
107    expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
108    let returning = returning
109        .expr
110        .into_iter()
111        .map(|mut expr| {
112            expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
113            expr.lower_uncorrelated(scx.catalog.system_vars())
114        })
115        .collect::<Result<Vec<_>, _>>()?;
116
117    Ok(Plan::Insert(InsertPlan {
118        id,
119        values: expr,
120        returning,
121    }))
122}
123
124pub fn describe_delete(
125    scx: &StatementContext,
126    stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128    query::plan_delete_query(scx, stmt)?;
129    Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133    scx: &StatementContext,
134    stmt: DeleteStatement<Aug>,
135    params: &Params,
136) -> Result<Plan, PlanError> {
137    let rtw_plan = query::plan_delete_query(scx, stmt)?;
138    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142    scx: &StatementContext,
143    stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145    query::plan_update_query(scx, stmt)?;
146    Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150    scx: &StatementContext,
151    stmt: UpdateStatement<Aug>,
152    params: &Params,
153) -> Result<Plan, PlanError> {
154    let rtw_plan = query::plan_update_query(scx, stmt)?;
155    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159    scx: &StatementContext,
160    kind: MutationKind,
161    params: &Params,
162    query::ReadThenWritePlan {
163        id,
164        mut selection,
165        finishing,
166        assignments,
167    }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169    selection.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
170    let mut assignments_outer = BTreeMap::new();
171    for (idx, mut set) in assignments {
172        set.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
173        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
174        assignments_outer.insert(idx, set);
175    }
176
177    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178        id,
179        selection,
180        finishing,
181        assignments: assignments_outer,
182        kind,
183        returning: Vec::new(),
184    }))
185}
186
187pub fn describe_select(
188    scx: &StatementContext,
189    stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192        return Ok(StatementDesc::new(Some(desc)));
193    }
194
195    let query::PlannedRootQuery { desc, .. } =
196        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197    Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201    scx: &StatementContext,
202    select: SelectStatement<Aug>,
203    params: &Params,
204    copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207        return Ok(Plan::SideEffectingFunc(f));
208    }
209
210    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211    Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215    scx: &StatementContext,
216    select: SelectStatement<Aug>,
217    params: &Params,
218    copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220    let when = query::plan_as_of(scx, select.as_of.clone())?;
221    let lifetime = QueryLifetime::OneShot;
222    let query::PlannedRootQuery {
223        mut expr,
224        desc,
225        finishing,
226        scope: _,
227    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228    expr.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
229
230    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
231    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
232    // This involves binding parameters and evaluating each expression to a number.
233    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
234    // so this `limit` has to be a constant.)
235    let limit = match finishing.limit {
236        None => None,
237        Some(mut limit) => {
238            limit.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
239            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
240            let Some(limit) = limit.as_literal() else {
241                sql_bail!(
242                    "Top-level LIMIT must be a constant expression, got {}",
243                    limit
244                )
245            };
246            match limit {
247                Datum::Null => None,
248                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
249                _ => {
250                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
251                    sql_bail!("LIMIT must be a non-negative INT or NULL")
252                }
253            }
254        }
255    };
256    let offset = {
257        let mut offset = finishing.offset.clone();
258        offset.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
259        let offset = offset_into_value(offset.take())?;
260        offset.try_into().map_err(|_| {
261            // We already checked in bind_parameters_and_simplify_offset / offset_into_value.
262            soft_panic_or_log!("unexpectedly negative OFFSET");
263            negative_offset_error(offset)
264        })?
265    };
266
267    // Unmaterializable functions are evaluated based on various information (e.g., the catalog)
268    // during sequencing, without taking AS OF into account. This would be hard to fix, so for now
269    // we just disallow AS OF when there is an unmaterializable function in a query (except mz_now).
270    if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
271        && select.as_of.is_some()
272        && expr.contains_unmaterializable_except_temporal()?
273    {
274        bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
275    }
276
277    let plan = SelectPlan {
278        source: expr,
279        when,
280        finishing: RowSetFinishing {
281            limit,
282            offset,
283            project: finishing.project,
284            order_by: finishing.order_by,
285        },
286        copy_to,
287        select: Some(Box::new(select)),
288    };
289
290    Ok((plan, desc))
291}
292
293pub fn describe_explain_plan(
294    scx: &StatementContext,
295    explain: ExplainPlanStatement<Aug>,
296) -> Result<StatementDesc, PlanError> {
297    let mut relation_desc = RelationDesc::builder();
298
299    match explain.stage() {
300        ExplainStage::RawPlan => {
301            let name = "Raw Plan";
302            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
303        }
304        ExplainStage::DecorrelatedPlan => {
305            let name = "Decorrelated Plan";
306            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
307        }
308        ExplainStage::LocalPlan => {
309            let name = "Locally Optimized Plan";
310            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
311        }
312        ExplainStage::GlobalPlan => {
313            let name = "Optimized Plan";
314            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
315        }
316        ExplainStage::PhysicalPlan => {
317            let name = "Physical Plan";
318            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
319        }
320        ExplainStage::Trace => {
321            relation_desc = relation_desc
322                .with_column("Time", SqlScalarType::UInt64.nullable(false))
323                .with_column("Path", SqlScalarType::String.nullable(false))
324                .with_column("Plan", SqlScalarType::String.nullable(false));
325        }
326        ExplainStage::PlanInsights => {
327            let name = "Plan Insights";
328            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329        }
330    };
331    let relation_desc = relation_desc.finish();
332
333    Ok(
334        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
335            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
336            _ => vec![],
337        }),
338    )
339}
340
341pub fn describe_explain_pushdown(
342    scx: &StatementContext,
343    statement: ExplainPushdownStatement<Aug>,
344) -> Result<StatementDesc, PlanError> {
345    let relation_desc = RelationDesc::builder()
346        .with_column("Source", SqlScalarType::String.nullable(false))
347        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
348        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
349        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
350        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
351        .finish();
352
353    Ok(
354        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
355            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
356            _ => vec![],
357        }),
358    )
359}
360
361pub fn describe_explain_analyze_object(
362    _scx: &StatementContext,
363    statement: ExplainAnalyzeObjectStatement<Aug>,
364) -> Result<StatementDesc, PlanError> {
365    if statement.as_sql {
366        let relation_desc = RelationDesc::builder()
367            .with_column("SQL", SqlScalarType::String.nullable(false))
368            .finish();
369        return Ok(StatementDesc::new(Some(relation_desc)));
370    }
371
372    match statement.properties {
373        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
374            properties,
375            skew,
376        }) => {
377            let mut relation_desc = RelationDesc::builder()
378                .with_column("operator", SqlScalarType::String.nullable(false));
379
380            if skew {
381                relation_desc =
382                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
383            }
384
385            let mut seen_properties = BTreeSet::new();
386            for property in properties {
387                // handle each property only once (belt and suspenders)
388                if !seen_properties.insert(property) {
389                    continue;
390                }
391
392                match property {
393                    ExplainAnalyzeComputationProperty::Memory if skew => {
394                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
395                        relation_desc = relation_desc
396                            .with_column("memory_ratio", numeric.clone())
397                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
398                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
399                            .with_column("total_memory", SqlScalarType::String.nullable(true))
400                            .with_column("records_ratio", numeric.clone())
401                            .with_column("worker_records", numeric.clone())
402                            .with_column("avg_records", numeric.clone())
403                            .with_column("total_records", numeric);
404                    }
405                    ExplainAnalyzeComputationProperty::Memory => {
406                        relation_desc = relation_desc
407                            .with_column("total_memory", SqlScalarType::String.nullable(true))
408                            .with_column(
409                                "total_records",
410                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
411                            );
412                    }
413                    ExplainAnalyzeComputationProperty::Cpu => {
414                        if skew {
415                            relation_desc = relation_desc
416                                .with_column(
417                                    "cpu_ratio",
418                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
419                                )
420                                .with_column(
421                                    "worker_elapsed",
422                                    SqlScalarType::Interval.nullable(true),
423                                )
424                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
425                        }
426                        relation_desc = relation_desc
427                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
428                    }
429                }
430            }
431
432            let relation_desc = relation_desc.finish();
433            Ok(StatementDesc::new(Some(relation_desc)))
434        }
435        ExplainAnalyzeProperty::Hints => {
436            let relation_desc = RelationDesc::builder()
437                .with_column("operator", SqlScalarType::String.nullable(true))
438                .with_column("levels", SqlScalarType::Int64.nullable(true))
439                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
440                .with_column("hint", SqlScalarType::Float64.nullable(true))
441                .with_column("savings", SqlScalarType::String.nullable(true))
442                .finish();
443            Ok(StatementDesc::new(Some(relation_desc)))
444        }
445    }
446}
447
448pub fn describe_explain_analyze_cluster(
449    _scx: &StatementContext,
450    statement: ExplainAnalyzeClusterStatement,
451) -> Result<StatementDesc, PlanError> {
452    if statement.as_sql {
453        let relation_desc = RelationDesc::builder()
454            .with_column("SQL", SqlScalarType::String.nullable(false))
455            .finish();
456        return Ok(StatementDesc::new(Some(relation_desc)));
457    }
458
459    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
460
461    let mut relation_desc = RelationDesc::builder()
462        .with_column("object", SqlScalarType::String.nullable(false))
463        .with_column("global_id", SqlScalarType::String.nullable(false));
464
465    if skew {
466        relation_desc =
467            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
468    }
469
470    let mut seen_properties = BTreeSet::new();
471    for property in properties {
472        // handle each property only once (belt and suspenders)
473        if !seen_properties.insert(property) {
474            continue;
475        }
476
477        match property {
478            ExplainAnalyzeComputationProperty::Memory if skew => {
479                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
480                relation_desc = relation_desc
481                    .with_column("max_operator_memory_ratio", numeric.clone())
482                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
483                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
484                    .with_column("total_memory", SqlScalarType::String.nullable(true))
485                    .with_column("max_operator_records_ratio", numeric.clone())
486                    .with_column("worker_records", numeric.clone())
487                    .with_column("avg_records", numeric.clone())
488                    .with_column("total_records", numeric);
489            }
490            ExplainAnalyzeComputationProperty::Memory => {
491                relation_desc = relation_desc
492                    .with_column("total_memory", SqlScalarType::String.nullable(true))
493                    .with_column(
494                        "total_records",
495                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
496                    );
497            }
498            ExplainAnalyzeComputationProperty::Cpu if skew => {
499                relation_desc = relation_desc
500                    .with_column(
501                        "max_operator_cpu_ratio",
502                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
503                    )
504                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
505                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
506                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
507            }
508            ExplainAnalyzeComputationProperty::Cpu => {
509                relation_desc = relation_desc
510                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
511            }
512        }
513    }
514
515    Ok(StatementDesc::new(Some(relation_desc.finish())))
516}
517
518pub fn describe_explain_timestamp(
519    scx: &StatementContext,
520    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522    let relation_desc = RelationDesc::builder()
523        .with_column("Timestamp", SqlScalarType::String.nullable(false))
524        .finish();
525
526    Ok(StatementDesc::new(Some(relation_desc))
527        .with_params(describe_select(scx, select)?.param_types))
528}
529
530pub fn describe_explain_schema(
531    _: &StatementContext,
532    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
533) -> Result<StatementDesc, PlanError> {
534    let relation_desc = RelationDesc::builder()
535        .with_column("Schema", SqlScalarType::String.nullable(false))
536        .finish();
537    Ok(StatementDesc::new(Some(relation_desc)))
538}
539
540// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
541// `bool`:
542// - When it's an override of a global feature flag, for example optimizer feature flags. In this
543//   case, we need not just false and true, but also None to say "take the value of the global
544//   flag".
545// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
546//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
547//   but otherwise we'd like to default to false.
548generate_extracted_config!(
549    ExplainPlanOption,
550    (Arity, Option<bool>, Default(None)),
551    (Cardinality, bool, Default(false)),
552    (ColumnNames, bool, Default(false)),
553    (FilterPushdown, Option<bool>, Default(None)),
554    (HumanizedExpressions, Option<bool>, Default(None)),
555    (JoinImplementations, bool, Default(false)),
556    (Keys, bool, Default(false)),
557    (LinearChains, bool, Default(false)),
558    (NoFastPath, bool, Default(false)),
559    (NonNegative, bool, Default(false)),
560    (NoNotices, bool, Default(false)),
561    (NodeIdentifiers, bool, Default(false)),
562    (Raw, bool, Default(false)),
563    (RawPlans, bool, Default(false)),
564    (RawSyntax, bool, Default(false)),
565    (Redacted, bool, Default(false)),
566    (SubtreeSize, bool, Default(false)),
567    (Timing, bool, Default(false)),
568    (Types, bool, Default(false)),
569    (Equivalences, bool, Default(false)),
570    (ReoptimizeImportedViews, Option<bool>, Default(None)),
571    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
572    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
573    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
574    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
575    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
576    (
577        EnableProjectionPushdownAfterRelationCse,
578        Option<bool>,
579        Default(None)
580    )
581);
582
583impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
584    type Error = PlanError;
585
586    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
587        // If `WITH(raw)` is specified, ensure that the config will be as
588        // representative for the original plan as possible.
589        if v.raw {
590            v.raw_plans = true;
591            v.raw_syntax = true;
592        }
593
594        // Certain config should default to be enabled in release builds running on
595        // staging or prod (where SOFT_ASSERTIONS are turned off).
596        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
597
598        Ok(ExplainConfig {
599            arity: v.arity.unwrap_or(enable_on_prod),
600            cardinality: v.cardinality,
601            column_names: v.column_names,
602            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
603            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
604            join_impls: v.join_implementations,
605            keys: v.keys,
606            linear_chains: !v.raw_plans && v.linear_chains,
607            no_fast_path: v.no_fast_path,
608            no_notices: v.no_notices,
609            node_ids: v.node_identifiers,
610            non_negative: v.non_negative,
611            raw_plans: v.raw_plans,
612            raw_syntax: v.raw_syntax,
613            verbose_syntax: false,
614            redacted: v.redacted,
615            subtree_size: v.subtree_size,
616            equivalences: v.equivalences,
617            timing: v.timing,
618            types: v.types,
619            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
620            features: OptimizerFeatureOverrides {
621                enable_eager_delta_joins: v.enable_eager_delta_joins,
622                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
623                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
624                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
625                enable_consolidate_after_union_negate: Default::default(),
626                enable_reduce_mfp_fusion: Default::default(),
627                enable_cardinality_estimates: Default::default(),
628                persist_fast_path_limit: Default::default(),
629                reoptimize_imported_views: v.reoptimize_imported_views,
630                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
631                enable_projection_pushdown_after_relation_cse: v
632                    .enable_projection_pushdown_after_relation_cse,
633                enable_less_reduce_in_eqprop: Default::default(),
634                enable_dequadratic_eqprop_map: Default::default(),
635                enable_eq_classes_withholding_errors: Default::default(),
636                enable_fast_path_plan_insights: Default::default(),
637                enable_cast_elimination: Default::default(),
638                enable_case_literal_transform: Default::default(),
639                enable_simplify_quantified_comparisons: Default::default(),
640                enable_coalesce_case_transform: Default::default(),
641            },
642        })
643    }
644}
645
646fn plan_explainee(
647    scx: &StatementContext,
648    explainee: Explainee<Aug>,
649    params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651    use crate::plan::ExplaineeStatement;
652
653    let is_replan = matches!(
654        explainee,
655        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656    );
657
658    let explainee = match explainee {
659        Explainee::View(name) | Explainee::ReplanView(name) => {
660            let item = scx.get_item_by_resolved_name(&name)?;
661            let item_type = item.item_type();
662            if item_type != CatalogItemType::View {
663                sql_bail!("Expected {name} to be a view, not a {item_type}");
664            }
665            match is_replan {
666                true => crate::plan::Explainee::ReplanView(item.id()),
667                false => crate::plan::Explainee::View(item.id()),
668            }
669        }
670        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671            let item = scx.get_item_by_resolved_name(&name)?;
672            let item_type = item.item_type();
673            if item_type != CatalogItemType::MaterializedView {
674                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675            }
676            match is_replan {
677                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678                false => crate::plan::Explainee::MaterializedView(item.id()),
679            }
680        }
681        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682            let item = scx.get_item_by_resolved_name(&name)?;
683            let item_type = item.item_type();
684            if item_type != CatalogItemType::Index {
685                sql_bail!("Expected {name} to be an index, not a {item_type}");
686            }
687            match is_replan {
688                true => crate::plan::Explainee::ReplanIndex(item.id()),
689                false => crate::plan::Explainee::Index(item.id()),
690            }
691        }
692        Explainee::Select(select, broken) => {
693            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695        }
696        Explainee::CreateView(mut stmt, broken) => {
697            if stmt.if_exists != IfExistsBehavior::Skip {
698                // If we don't force this parameter to Skip planning will
699                // fail for names that already exist in the catalog. This
700                // can happen even in `Replace` mode if the existing item
701                // has dependencies.
702                stmt.if_exists = IfExistsBehavior::Skip;
703            } else {
704                sql_bail!(
705                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
707                );
708            }
709
710            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711                sql_bail!("expected CreateViewPlan plan");
712            };
713
714            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715        }
716        Explainee::CreateMaterializedView(mut stmt, broken) => {
717            if stmt.if_exists != IfExistsBehavior::Skip {
718                // If we don't force this parameter to Skip planning will
719                // fail for names that already exist in the catalog. This
720                // can happen even in `Replace` mode if the existing item
721                // has dependencies.
722                stmt.if_exists = IfExistsBehavior::Skip;
723            } else {
724                sql_bail!(
725                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
727                );
728            }
729
730            let Plan::CreateMaterializedView(plan) =
731                ddl::plan_create_materialized_view(scx, *stmt)?
732            else {
733                sql_bail!("expected CreateMaterializedViewPlan plan");
734            };
735
736            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737                broken,
738                plan,
739            })
740        }
741        Explainee::CreateIndex(mut stmt, broken) => {
742            if !stmt.if_not_exists {
743                // If we don't force this parameter to true planning will
744                // fail for index items that already exist in the catalog.
745                stmt.if_not_exists = true;
746            } else {
747                sql_bail!(
748                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
750                );
751            }
752
753            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754                sql_bail!("expected CreateIndexPlan plan");
755            };
756
757            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758        }
759        Explainee::Subscribe(stmt, broken) => {
760            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761                sql_bail!("expected SubscribePlan");
762            };
763            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764        }
765    };
766
767    Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771    scx: &StatementContext,
772    explain: ExplainPlanStatement<Aug>,
773    params: &Params,
774) -> Result<Plan, PlanError> {
775    let (format, verbose_syntax) = match explain.format() {
776        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780    };
781    let stage = explain.stage();
782
783    // Plan ExplainConfig.
784    let mut config = {
785        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788            // If filtering is disabled, explain plans should not include pushdown info.
789            with_options.filter_pushdown = Some(false);
790        }
791
792        ExplainConfig::try_from(with_options)?
793    };
794    config.verbose_syntax = verbose_syntax;
795
796    let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798    Ok(Plan::ExplainPlan(ExplainPlanPlan {
799        stage,
800        format,
801        config,
802        explainee,
803    }))
804}
805
806pub fn plan_explain_schema(
807    scx: &StatementContext,
808    explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810    let ExplainSinkSchemaStatement {
811        schema_for,
812        // Parser limits to JSON.
813        format: _,
814        mut statement,
815    } = explain_schema;
816
817    // Force the sink's name to one that's guaranteed not to exist, by virtue of
818    // being a non-existent item in a schema under the system's control, so that
819    // `plan_create_sink` doesn't complain about the name already existing.
820    statement.name = Some(UnresolvedItemName::qualified(&[
821        ident!("mz_catalog"),
822        ident!("mz_explain_schema"),
823    ]));
824
825    crate::pure::purify_create_sink_avro_doc_on_options(
826        scx.catalog,
827        *statement.from.item_id(),
828        &mut statement.format,
829    )?;
830
831    match ddl::plan_create_sink(scx, statement)? {
832        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833            StorageSinkConnection::Kafka(KafkaSinkConnection {
834                format:
835                    KafkaSinkFormat {
836                        key_format,
837                        value_format:
838                            KafkaSinkFormatType::Avro {
839                                schema: value_schema,
840                                ..
841                            },
842                        ..
843                    },
844                ..
845            }) => {
846                let schema = match schema_for {
847                    ExplainSinkSchemaFor::Key => key_format
848                        .and_then(|f| match f {
849                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850                            _ => None,
851                        })
852                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853                    ExplainSinkSchemaFor::Value => value_schema,
854                };
855
856                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857                    sink_from: sink.from,
858                    json_schema: schema,
859                }))
860            }
861            _ => bail_unsupported!(
862                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863            ),
864        },
865        _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
866    }
867}
868
869pub fn plan_explain_pushdown(
870    scx: &StatementContext,
871    statement: ExplainPushdownStatement<Aug>,
872    params: &Params,
873) -> Result<Plan, PlanError> {
874    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875    let explainee = plan_explainee(scx, statement.explainee, params)?;
876    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880    scx: &StatementContext,
881    statement: ExplainAnalyzeObjectStatement<Aug>,
882    params: &Params,
883) -> Result<Plan, PlanError> {
884    let explainee_name = statement
885        .explainee
886        .name()
887        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888        .full_name_str();
889    let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891    let check_ownership = |item_id: &CatalogItemId, item_type: &str| -> Result<(), PlanError> {
892        if scx.catalog.restrict_to_user_objects() {
893            let item = scx.catalog.get_item(item_id);
894            if item.owner_id() != *scx.catalog.active_role_id() {
895                let full_name = scx.catalog.resolve_full_name(item.name());
896                return Err(sql_err!("must be owner of {item_type} {full_name}"));
897            }
898        }
899        Ok(())
900    };
901    match &explainee {
902        plan::Explainee::Index(item_id) => check_ownership(item_id, "INDEX")?,
903        plan::Explainee::MaterializedView(item_id) => {
904            check_ownership(item_id, "MATERIALIZED VIEW")?
905        }
906        _ => return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",)),
907    };
908
909    // generate SQL query
910
911    /* WITH {CTEs}
912       SELECT REPEAT(' ', nesting * 2) || operator AS operator
913             {columns}
914        FROM      mz_introspection.mz_lir_mapping mlm
915             JOIN {from} USING (lir_id)
916             JOIN mz_introspection.mz_mappable_objects mo
917               ON (mlm.global_id = mo.global_id)
918       WHERE     mo.name = {escaped explainee_name}
919             AND {predicates}
920       ORDER BY lir_id DESC
921    */
922    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
923    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
924    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
925    let mut predicates = vec![format!(
926        "mo.name = {}",
927        escaped_string_literal(&explainee_name)
928    )];
929    let mut order_by = vec!["mlm.lir_id DESC"];
930
931    match statement.properties {
932        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
933            properties,
934            skew,
935        }) => {
936            let mut worker_id = None;
937            let mut seen_properties = BTreeSet::new();
938            for property in properties {
939                // handle each property only once (belt and suspenders)
940                if !seen_properties.insert(property) {
941                    continue;
942                }
943
944                match property {
945                    ExplainAnalyzeComputationProperty::Memory => {
946                        ctes.push((
947                            "summary_memory",
948                            r#"
949  SELECT mlm.global_id AS global_id,
950         mlm.lir_id AS lir_id,
951         SUM(mas.size) AS total_memory,
952         SUM(mas.records) AS total_records,
953         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
954         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
955    FROM            mz_introspection.mz_lir_mapping mlm
956         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
957               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
958                 ON (mas.operator_id = valid_id)
959GROUP BY mlm.global_id, mlm.lir_id"#,
960                        ));
961                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
962
963                        if skew {
964                            ctes.push((
965                                "per_worker_memory",
966                                r#"
967  SELECT mlm.global_id AS global_id,
968         mlm.lir_id AS lir_id,
969         mas.worker_id AS worker_id,
970         SUM(mas.size) AS worker_memory,
971         SUM(mas.records) AS worker_records
972    FROM            mz_introspection.mz_lir_mapping mlm
973         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
974               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
975                 ON (mas.operator_id = valid_id)
976GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
977                            ));
978                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
979
980                            if let Some(worker_id) = worker_id {
981                                predicates.push(format!("pwm.worker_id = {worker_id}"));
982                            } else {
983                                worker_id = Some("pwm.worker_id");
984                                columns.push("pwm.worker_id AS worker_id");
985                                order_by.push("worker_id");
986                            }
987
988                            columns.extend([
989                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
990                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
991                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
992                                "pg_size_pretty(sm.total_memory) AS total_memory",
993                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
994                                "pwm.worker_records AS worker_records",
995                                "sm.avg_records AS avg_records",
996                                "sm.total_records AS total_records",
997                            ]);
998                        } else {
999                            columns.extend([
1000                                "pg_size_pretty(sm.total_memory) AS total_memory",
1001                                "sm.total_records AS total_records",
1002                            ]);
1003                        }
1004                    }
1005                    ExplainAnalyzeComputationProperty::Cpu => {
1006                        ctes.push((
1007                            "summary_cpu",
1008                            r#"
1009  SELECT mlm.global_id AS global_id,
1010         mlm.lir_id AS lir_id,
1011         SUM(mse.elapsed_ns) AS total_ns,
1012         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1013    FROM            mz_introspection.mz_lir_mapping mlm
1014         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1015               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1016                 ON (mse.id = valid_id)
1017GROUP BY mlm.global_id, mlm.lir_id"#,
1018                        ));
1019                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1020
1021                        if skew {
1022                            ctes.push((
1023                                "per_worker_cpu",
1024                                r#"
1025  SELECT mlm.global_id AS global_id,
1026         mlm.lir_id AS lir_id,
1027         mse.worker_id AS worker_id,
1028         SUM(mse.elapsed_ns) AS worker_ns
1029    FROM            mz_introspection.mz_lir_mapping mlm
1030         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1031               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1032                 ON (mse.id = valid_id)
1033GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1034                            ));
1035                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1036
1037                            if let Some(worker_id) = worker_id {
1038                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1039                            } else {
1040                                worker_id = Some("pwc.worker_id");
1041                                columns.push("pwc.worker_id AS worker_id");
1042                                order_by.push("worker_id");
1043                            }
1044
1045                            columns.extend([
1046                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1047                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1048                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1049                            ]);
1050                        }
1051                        columns.push(
1052                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1053                        );
1054                    }
1055                }
1056            }
1057        }
1058        ExplainAnalyzeProperty::Hints => {
1059            columns.extend([
1060                "megsa.levels AS levels",
1061                "megsa.to_cut AS to_cut",
1062                "megsa.hint AS hint",
1063                "pg_size_pretty(megsa.savings) AS savings",
1064            ]);
1065            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1066            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1067             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1068        }
1069    }
1070
1071    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1072
1073    let ctes = if !ctes.is_empty() {
1074        format!(
1075            "WITH {}",
1076            separated(
1077                ",\n",
1078                ctes.iter()
1079                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1080            )
1081        )
1082    } else {
1083        String::new()
1084    };
1085    let columns = separated(", ", columns);
1086    let from = separated(" ", from);
1087    let predicates = separated(" AND ", predicates);
1088    let order_by = separated(", ", order_by);
1089    let query = format!(
1090        r#"{ctes}
1091SELECT {columns}
1092FROM {from}
1093WHERE {predicates}
1094ORDER BY {order_by}"#
1095    );
1096
1097    if statement.as_sql {
1098        let rows = vec![Row::pack_slice(&[Datum::String(
1099            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1100                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1101            })?,
1102        )])];
1103        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1104
1105        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1106    } else {
1107        let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1108        scx.record_sql_impl_ids(&resolved_ids);
1109        show_select.plan()
1110    }
1111}
1112
1113pub fn plan_explain_analyze_cluster(
1114    scx: &StatementContext,
1115    statement: ExplainAnalyzeClusterStatement,
1116    _params: &Params,
1117) -> Result<Plan, PlanError> {
1118    // object string
1119    // worker_id uint64        (if           skew)
1120    // memory_ratio numeric    (if memory && skew)
1121    // worker_memory string    (if memory && skew)
1122    // avg_memory string       (if memory && skew)
1123    // total_memory string     (if memory)
1124    // records_ratio numeric   (if memory && skew)
1125    // worker_records          (if memory && skew)
1126    // avg_records numeric     (if memory && skew)
1127    // total_records numeric   (if memory)
1128    // cpu_ratio numeric       (if cpu    && skew)
1129    // worker_elapsed interval (if cpu    && skew)
1130    // avg_elapsed interval    (if cpu    && skew)
1131    // total_elapsed interval  (if cpu)
1132
1133    /* WITH {CTEs}
1134       SELECT mo.name AS object
1135             {columns}
1136        FROM mz_introspection.mz_mappable_objects mo
1137             {from}
1138       WHERE {predicates}
1139       ORDER BY {order_by}, mo.name DESC
1140    */
1141    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1142    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1143    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1144    let mut predicates = vec![];
1145    let mut order_by = vec![];
1146
1147    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1148    let mut worker_id = None;
1149    let mut seen_properties = BTreeSet::new();
1150    for property in properties {
1151        // handle each property only once (belt and suspenders)
1152        if !seen_properties.insert(property) {
1153            continue;
1154        }
1155
1156        match property {
1157            ExplainAnalyzeComputationProperty::Memory => {
1158                if skew {
1159                    let mut set_worker_id = false;
1160                    if let Some(worker_id) = worker_id {
1161                        // join condition if we're showing skew for more than one property
1162                        predicates.push(format!("om.worker_id = {worker_id}"));
1163                    } else {
1164                        worker_id = Some("om.worker_id");
1165                        columns.push("om.worker_id AS worker_id");
1166                        set_worker_id = true; // we'll add ourselves to `order_by` later
1167                    };
1168
1169                    // computes the average memory per LIR operator (for per operator ratios)
1170                    ctes.push((
1171                    "per_operator_memory_summary",
1172                    r#"
1173SELECT mlm.global_id AS global_id,
1174       mlm.lir_id AS lir_id,
1175       SUM(mas.size) AS total_memory,
1176       SUM(mas.records) AS total_records,
1177       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1178       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1179FROM        mz_introspection.mz_lir_mapping mlm
1180 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1181       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1182         ON (mas.operator_id = valid_id)
1183GROUP BY mlm.global_id, mlm.lir_id"#,
1184                ));
1185
1186                    // computes the memory per worker in a per operator way
1187                    ctes.push((
1188                    "per_operator_memory_per_worker",
1189                    r#"
1190SELECT mlm.global_id AS global_id,
1191       mlm.lir_id AS lir_id,
1192       mas.worker_id AS worker_id,
1193       SUM(mas.size) AS worker_memory,
1194       SUM(mas.records) AS worker_records
1195FROM        mz_introspection.mz_lir_mapping mlm
1196 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1197       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1198         ON (mas.operator_id = valid_id)
1199GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1200                    ));
1201
1202                    // computes memory ratios per worker per operator
1203                    ctes.push((
1204                    "per_operator_memory_ratios",
1205                    r#"
1206SELECT pompw.global_id AS global_id,
1207       pompw.lir_id AS lir_id,
1208       pompw.worker_id AS worker_id,
1209       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1210       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1211  FROM      per_operator_memory_per_worker pompw
1212       JOIN per_operator_memory_summary poms
1213         USING (global_id, lir_id)
1214"#,
1215                    ));
1216
1217                    // summarizes each object, per worker
1218                    ctes.push((
1219                        "object_memory",
1220                        r#"
1221SELECT pompw.global_id AS global_id,
1222       pompw.worker_id AS worker_id,
1223       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1224       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1225       SUM(pompw.worker_memory) AS worker_memory,
1226       SUM(pompw.worker_records) AS worker_records
1227FROM        per_operator_memory_per_worker pompw
1228     JOIN   per_operator_memory_ratios pomr
1229     USING (global_id, worker_id, lir_id)
1230GROUP BY pompw.global_id, pompw.worker_id
1231"#,
1232                    ));
1233
1234                    // summarizes each worker
1235                    ctes.push(("object_average_memory", r#"
1236SELECT om.global_id AS global_id,
1237       SUM(om.worker_memory) AS total_memory,
1238       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1239       SUM(om.worker_records) AS total_records,
1240       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1241  FROM object_memory om
1242GROUP BY om.global_id"#));
1243
1244                    from.push("LEFT JOIN object_memory om USING (global_id)");
1245                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1246
1247                    columns.extend([
1248                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1249                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1250                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1251                        "pg_size_pretty(oam.total_memory) AS total_memory",
1252                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1253                        "om.worker_records AS worker_records",
1254                        "oam.avg_records AS avg_records",
1255                        "oam.total_records AS total_records",
1256                    ]);
1257
1258                    order_by.extend([
1259                        "max_operator_memory_ratio DESC NULLS LAST",
1260                        "max_operator_records_ratio DESC NULLS LAST",
1261                        "om.worker_memory DESC NULLS LAST",
1262                        "worker_records DESC NULLS LAST",
1263                    ]);
1264
1265                    if set_worker_id {
1266                        order_by.push("worker_id");
1267                    }
1268                } else {
1269                    // no skew, so just compute totals
1270                    ctes.push((
1271                        "per_operator_memory_totals",
1272                        r#"
1273    SELECT mlm.global_id AS global_id,
1274           mlm.lir_id AS lir_id,
1275           SUM(mas.size) AS total_memory,
1276           SUM(mas.records) AS total_records
1277    FROM        mz_introspection.mz_lir_mapping mlm
1278     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1279           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1280             ON (mas.operator_id = valid_id)
1281    GROUP BY mlm.global_id, mlm.lir_id"#,
1282                    ));
1283
1284                    ctes.push((
1285                        "object_memory_totals",
1286                        r#"
1287SELECT pomt.global_id AS global_id,
1288       SUM(pomt.total_memory) AS total_memory,
1289       SUM(pomt.total_records) AS total_records
1290FROM per_operator_memory_totals pomt
1291GROUP BY pomt.global_id
1292"#,
1293                    ));
1294
1295                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1296                    columns.extend([
1297                        "pg_size_pretty(omt.total_memory) AS total_memory",
1298                        "omt.total_records AS total_records",
1299                    ]);
1300                    order_by.extend([
1301                        "omt.total_memory DESC NULLS LAST",
1302                        "total_records DESC NULLS LAST",
1303                    ]);
1304                }
1305            }
1306            ExplainAnalyzeComputationProperty::Cpu => {
1307                if skew {
1308                    let mut set_worker_id = false;
1309                    if let Some(worker_id) = worker_id {
1310                        // join condition if we're showing skew for more than one property
1311                        predicates.push(format!("oc.worker_id = {worker_id}"));
1312                    } else {
1313                        worker_id = Some("oc.worker_id");
1314                        columns.push("oc.worker_id AS worker_id");
1315                        set_worker_id = true; // we'll add ourselves to `order_by` later
1316                    };
1317
1318                    // computes the average memory per LIR operator (for per operator ratios)
1319                    ctes.push((
1320    "per_operator_cpu_summary",
1321    r#"
1322SELECT mlm.global_id AS global_id,
1323       mlm.lir_id AS lir_id,
1324       SUM(mse.elapsed_ns) AS total_ns,
1325       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1326FROM       mz_introspection.mz_lir_mapping mlm
1327CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1328      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1329        ON (mse.id = valid_id)
1330GROUP BY mlm.global_id, mlm.lir_id"#,
1331));
1332
1333                    // computes the CPU per worker in a per operator way
1334                    ctes.push((
1335                        "per_operator_cpu_per_worker",
1336                        r#"
1337SELECT mlm.global_id AS global_id,
1338       mlm.lir_id AS lir_id,
1339       mse.worker_id AS worker_id,
1340       SUM(mse.elapsed_ns) AS worker_ns
1341FROM       mz_introspection.mz_lir_mapping mlm
1342CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1343      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1344        ON (mse.id = valid_id)
1345GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1346                    ));
1347
1348                    // computes CPU ratios per worker per operator
1349                    ctes.push((
1350                        "per_operator_cpu_ratios",
1351                        r#"
1352SELECT pocpw.global_id AS global_id,
1353       pocpw.lir_id AS lir_id,
1354       pocpw.worker_id AS worker_id,
1355       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1356FROM      per_operator_cpu_per_worker pocpw
1357     JOIN per_operator_cpu_summary pocs
1358     USING (global_id, lir_id)
1359"#,
1360                    ));
1361
1362                    // summarizes each object, per worker
1363                    ctes.push((
1364                        "object_cpu",
1365                        r#"
1366SELECT pocpw.global_id AS global_id,
1367       pocpw.worker_id AS worker_id,
1368       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1369       SUM(pocpw.worker_ns) AS worker_ns
1370FROM      per_operator_cpu_per_worker pocpw
1371     JOIN per_operator_cpu_ratios pomr
1372     USING (global_id, worker_id, lir_id)
1373GROUP BY pocpw.global_id, pocpw.worker_id
1374"#,
1375                    ));
1376
1377                    // summarizes each worker
1378                    ctes.push((
1379                        "object_average_cpu",
1380                        r#"
1381SELECT oc.global_id AS global_id,
1382       SUM(oc.worker_ns) AS total_ns,
1383       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1384  FROM object_cpu oc
1385GROUP BY oc.global_id"#,));
1386
1387                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1388                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1389
1390                    columns.extend([
1391                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1392                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1393                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1394                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1395                    ]);
1396
1397                    order_by.extend([
1398                        "max_operator_cpu_ratio DESC NULLS LAST",
1399                        "worker_elapsed DESC NULLS LAST",
1400                    ]);
1401
1402                    if set_worker_id {
1403                        order_by.push("worker_id");
1404                    }
1405                } else {
1406                    // no skew, so just compute totals
1407                    ctes.push((
1408                        "per_operator_cpu_totals",
1409                        r#"
1410    SELECT mlm.global_id AS global_id,
1411           mlm.lir_id AS lir_id,
1412           SUM(mse.elapsed_ns) AS total_ns
1413    FROM        mz_introspection.mz_lir_mapping mlm
1414     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1415           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1416             ON (mse.id = valid_id)
1417    GROUP BY mlm.global_id, mlm.lir_id"#,
1418                    ));
1419
1420                    ctes.push((
1421                        "object_cpu_totals",
1422                        r#"
1423SELECT poct.global_id AS global_id,
1424       SUM(poct.total_ns) AS total_ns
1425FROM per_operator_cpu_totals poct
1426GROUP BY poct.global_id
1427"#,
1428                    ));
1429
1430                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1431                    columns
1432                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1433                    order_by.extend(["total_elapsed DESC NULLS LAST"]);
1434                }
1435            }
1436        }
1437    }
1438
1439    // generate SQL query text
1440    let ctes = if !ctes.is_empty() {
1441        format!(
1442            "WITH {}",
1443            separated(
1444                ",\n",
1445                ctes.iter()
1446                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1447            )
1448        )
1449    } else {
1450        String::new()
1451    };
1452    let columns = separated(", ", columns);
1453    let from = separated(" ", from);
1454    let predicates = if !predicates.is_empty() {
1455        format!("WHERE {}", separated(" AND ", predicates))
1456    } else {
1457        String::new()
1458    };
1459    // add mo.name last, to break ties only
1460    order_by.push("mo.name DESC");
1461    let order_by = separated(", ", order_by);
1462    let query = format!(
1463        r#"{ctes}
1464SELECT {columns}
1465FROM {from}
1466{predicates}
1467ORDER BY {order_by}"#
1468    );
1469
1470    if statement.as_sql {
1471        let rows = vec![Row::pack_slice(&[Datum::String(
1472            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1473                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1474            })?,
1475        )])];
1476        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1477
1478        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1479    } else {
1480        let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1481        scx.record_sql_impl_ids(&resolved_ids);
1482        show_select.plan()
1483    }
1484}
1485
1486pub fn plan_explain_timestamp(
1487    scx: &StatementContext,
1488    explain: ExplainTimestampStatement<Aug>,
1489) -> Result<Plan, PlanError> {
1490    let (format, _verbose_syntax) = match explain.format() {
1491        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1492        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1493        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1494        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1495    };
1496
1497    let raw_plan = {
1498        let query::PlannedRootQuery {
1499            expr: raw_plan,
1500            desc: _,
1501            finishing: _,
1502            scope: _,
1503        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1504        if raw_plan.contains_parameters()? {
1505            return Err(PlanError::ParameterNotAllowed(
1506                "EXPLAIN TIMESTAMP".to_string(),
1507            ));
1508        }
1509
1510        raw_plan
1511    };
1512    let when = query::plan_as_of(scx, explain.select.as_of)?;
1513
1514    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1515        format,
1516        raw_plan,
1517        when,
1518    }))
1519}
1520
1521generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1522
1523pub fn describe_subscribe(
1524    scx: &StatementContext,
1525    stmt: SubscribeStatement<Aug>,
1526) -> Result<StatementDesc, PlanError> {
1527    let relation_desc = match stmt.relation {
1528        SubscribeRelation::Name(name) => {
1529            let item = scx.get_item_by_resolved_name(&name)?;
1530            match item.relation_desc() {
1531                Some(desc) => desc.into_owned(),
1532                None => sql_bail!(
1533                    "'{}' cannot be subscribed to because it is a {}",
1534                    name.full_name_str(),
1535                    item.item_type(),
1536                ),
1537            }
1538        }
1539        SubscribeRelation::Query(query) => {
1540            let query::PlannedRootQuery { desc, .. } =
1541                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1542            desc
1543        }
1544    };
1545    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1546    let progress = progress.unwrap_or(false);
1547    let mut desc = RelationDesc::builder().with_column(
1548        "mz_timestamp",
1549        SqlScalarType::Numeric {
1550            max_scale: Some(NumericMaxScale::ZERO),
1551        }
1552        .nullable(false),
1553    );
1554    if progress {
1555        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1556    }
1557
1558    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1559    match stmt.output {
1560        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1561            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1562            for (name, mut ty) in relation_desc.into_iter() {
1563                if progress {
1564                    ty.nullable = true;
1565                }
1566                desc = desc.with_column(name, ty);
1567            }
1568        }
1569        SubscribeOutput::EnvelopeUpsert { key_columns }
1570        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1571            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1572            let key_columns = key_columns
1573                .into_iter()
1574                .map(normalize::column_name)
1575                .collect_vec();
1576            let mut before_values_desc = RelationDesc::builder();
1577            let mut after_values_desc = RelationDesc::builder();
1578
1579            // Add the key columns in the order that they're specified.
1580            for column_name in &key_columns {
1581                let mut column_ty = relation_desc
1582                    .get_by_name(column_name)
1583                    .map(|(_pos, ty)| ty.clone())
1584                    .ok_or_else(|| PlanError::UnknownColumn {
1585                        table: None,
1586                        column: column_name.clone(),
1587                        similar: Box::new([]),
1588                    })?;
1589                if progress {
1590                    column_ty.nullable = true;
1591                }
1592                desc = desc.with_column(column_name, column_ty);
1593            }
1594
1595            // Then add the remaining columns in the order from the original
1596            // table, filtering out the key columns since we added those above.
1597            for (mut name, mut ty) in relation_desc
1598                .into_iter()
1599                .filter(|(name, _ty)| !key_columns.contains(name))
1600            {
1601                ty.nullable = true;
1602                before_values_desc =
1603                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1604                if debezium {
1605                    name = format!("after_{}", name).into();
1606                }
1607                after_values_desc = after_values_desc.with_column(name, ty);
1608            }
1609
1610            if debezium {
1611                desc = desc.concat(before_values_desc);
1612            }
1613            desc = desc.concat(after_values_desc);
1614        }
1615    }
1616    Ok(StatementDesc::new(Some(desc.finish())))
1617}
1618
1619pub fn plan_subscribe(
1620    scx: &StatementContext,
1621    SubscribeStatement {
1622        relation,
1623        options,
1624        as_of,
1625        up_to,
1626        output,
1627    }: SubscribeStatement<Aug>,
1628    params: &Params,
1629    copy_to: Option<CopyFormat>,
1630) -> Result<Plan, PlanError> {
1631    let (from, desc, scope) = match relation {
1632        SubscribeRelation::Name(name) => {
1633            let item = scx.get_item_by_resolved_name(&name)?;
1634            let Some(desc) = item.relation_desc() else {
1635                sql_bail!(
1636                    "'{}' cannot be subscribed to because it is a {}",
1637                    name.full_name_str(),
1638                    item.item_type(),
1639                );
1640            };
1641            let item_name = match name {
1642                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1643                _ => None,
1644            };
1645            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1646            (
1647                SubscribeFrom::Id(item.global_id()),
1648                desc.into_owned(),
1649                scope,
1650            )
1651        }
1652        SubscribeRelation::Query(query) => {
1653            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1654            let query::PlannedRootQuery {
1655                mut expr,
1656                desc,
1657                finishing,
1658                scope,
1659            } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1660            expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::Subscribe, params)?;
1661            let query = query::PlannedRootQuery {
1662                expr,
1663                desc,
1664                finishing,
1665                scope,
1666            };
1667            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1668            // finishing should have already been turned into a `TopK` by
1669            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1670            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1671                &query.finishing,
1672                query.desc.arity()
1673            ));
1674            let desc = query.desc.clone();
1675            (
1676                SubscribeFrom::Query {
1677                    expr: query.expr,
1678                    desc: query.desc,
1679                },
1680                desc,
1681                query.scope,
1682            )
1683        }
1684    };
1685
1686    let when = query::plan_as_of(scx, as_of)?;
1687    let up_to = up_to
1688        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1689        .transpose()?;
1690
1691    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1692    let ecx = ExprContext {
1693        qcx: &qcx,
1694        name: "",
1695        scope: &scope,
1696        relation_type: desc.typ(),
1697        allow_aggregates: false,
1698        allow_subqueries: true,
1699        allow_parameters: true,
1700        allow_windows: false,
1701    };
1702
1703    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1704    let output = match output {
1705        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1706        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1707            let order_by = key_columns
1708                .iter()
1709                .map(|ident| OrderByExpr {
1710                    expr: Expr::Identifier(vec![ident.clone()]),
1711                    asc: None,
1712                    nulls_last: None,
1713                })
1714                .collect_vec();
1715            let (order_by, map_exprs) = query::plan_order_by_exprs(
1716                &ExprContext {
1717                    name: "ENVELOPE UPSERT KEY clause",
1718                    ..ecx
1719                },
1720                &order_by[..],
1721                &output_columns[..],
1722            )?;
1723            if !map_exprs.is_empty() {
1724                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1725            }
1726            plan::SubscribeOutput::EnvelopeUpsert {
1727                order_by_keys: order_by,
1728            }
1729        }
1730        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1731            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1732            let order_by = key_columns
1733                .iter()
1734                .map(|ident| OrderByExpr {
1735                    expr: Expr::Identifier(vec![ident.clone()]),
1736                    asc: None,
1737                    nulls_last: None,
1738                })
1739                .collect_vec();
1740            let (order_by, map_exprs) = query::plan_order_by_exprs(
1741                &ExprContext {
1742                    name: "ENVELOPE DEBEZIUM KEY clause",
1743                    ..ecx
1744                },
1745                &order_by[..],
1746                &output_columns[..],
1747            )?;
1748            if !map_exprs.is_empty() {
1749                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1750            }
1751            plan::SubscribeOutput::EnvelopeDebezium {
1752                order_by_keys: order_by,
1753            }
1754        }
1755        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1756            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1757            let mz_diff = "mz_diff".into();
1758            let output_columns = std::iter::once((0, &mz_diff))
1759                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1760                .collect_vec();
1761            match query::plan_order_by_exprs(
1762                &ExprContext {
1763                    name: "WITHIN TIMESTAMP ORDER BY clause",
1764                    ..ecx
1765                },
1766                &order_by[..],
1767                &output_columns[..],
1768            ) {
1769                Err(PlanError::UnknownColumn {
1770                    table: None,
1771                    column,
1772                    similar: _,
1773                }) if &column == &mz_diff => {
1774                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1775                    // it looks like an unknown column. Instead, return a better error
1776                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1777                }
1778                Err(e) => return Err(e),
1779                Ok((order_by, map_exprs)) => {
1780                    if !map_exprs.is_empty() {
1781                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1782                    }
1783
1784                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1785                }
1786            }
1787        }
1788    };
1789
1790    let SubscribeOptionExtracted {
1791        progress, snapshot, ..
1792    } = options.try_into()?;
1793    Ok(Plan::Subscribe(SubscribePlan {
1794        from,
1795        when,
1796        up_to,
1797        with_snapshot: snapshot.unwrap_or(true),
1798        copy_to,
1799        emit_progress: progress.unwrap_or(false),
1800        output,
1801    }))
1802}
1803
1804pub fn describe_copy_from_table(
1805    scx: &StatementContext,
1806    table_name: <Aug as AstInfo>::ItemName,
1807    columns: Vec<Ident>,
1808) -> Result<StatementDesc, PlanError> {
1809    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1810    Ok(StatementDesc::new(Some(desc)))
1811}
1812
1813pub fn describe_copy_item(
1814    scx: &StatementContext,
1815    object_name: <Aug as AstInfo>::ItemName,
1816    columns: Vec<Ident>,
1817) -> Result<StatementDesc, PlanError> {
1818    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1819    Ok(StatementDesc::new(Some(desc)))
1820}
1821
1822pub fn describe_copy(
1823    scx: &StatementContext,
1824    CopyStatement {
1825        relation,
1826        direction,
1827        ..
1828    }: CopyStatement<Aug>,
1829) -> Result<StatementDesc, PlanError> {
1830    Ok(match (relation, direction) {
1831        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1832            describe_copy_item(scx, name, columns)?
1833        }
1834        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1835            describe_copy_from_table(scx, name, columns)?
1836        }
1837        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1838        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1839    }
1840    .with_is_copy())
1841}
1842
1843fn plan_copy_to_expr(
1844    scx: &StatementContext,
1845    select_plan: SelectPlan,
1846    desc: RelationDesc,
1847    to: &Expr<Aug>,
1848    format: CopyFormat,
1849    options: CopyOptionExtracted,
1850) -> Result<Plan, PlanError> {
1851    let conn_id = match options.aws_connection {
1852        Some(conn_id) => CatalogItemId::from(conn_id),
1853        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1854    };
1855    let connection = scx.get_item(&conn_id).connection()?;
1856
1857    match connection {
1858        mz_storage_types::connections::Connection::Aws(_) => {}
1859        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1860    }
1861
1862    let format = match format {
1863        CopyFormat::Csv => {
1864            let quote = extract_byte_param_value(options.quote, "quote")?;
1865            let escape = extract_byte_param_value(options.escape, "escape")?;
1866            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1867            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1868                CopyCsvFormatParams::try_new(
1869                    delimiter,
1870                    quote,
1871                    escape,
1872                    options.header,
1873                    options.null,
1874                )
1875                .map_err(|e| sql_err!("{}", e))?,
1876            ))
1877        }
1878        CopyFormat::Parquet => {
1879            // Validate that the output desc can be formatted as parquet.
1880            // COPY TO does not apply any type overrides, so pass `|_| None`.
1881            ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1882                .map_err(|e| sql_err!("{}", e))?;
1883            S3SinkFormat::Parquet
1884        }
1885        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1886        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1887    };
1888
1889    // Converting the to expr to a HirScalarExpr
1890    let mut to_expr = to.clone();
1891    transform_ast::transform(scx, &mut to_expr)?;
1892    let relation_type = RelationDesc::empty();
1893    let ecx = &ExprContext {
1894        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1895        name: "COPY TO target",
1896        scope: &Scope::empty(),
1897        relation_type: relation_type.typ(),
1898        allow_aggregates: false,
1899        allow_subqueries: false,
1900        allow_parameters: false,
1901        allow_windows: false,
1902    };
1903
1904    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1905
1906    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1907        sql_bail!(
1908            "MAX FILE SIZE cannot be less than {}",
1909            MIN_S3_SINK_FILE_SIZE
1910        );
1911    }
1912    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1913        sql_bail!(
1914            "MAX FILE SIZE cannot be greater than {}",
1915            MAX_S3_SINK_FILE_SIZE
1916        );
1917    }
1918
1919    Ok(Plan::CopyTo(CopyToPlan {
1920        select_plan,
1921        desc,
1922        to,
1923        connection: connection.to_owned(),
1924        connection_id: conn_id,
1925        format,
1926        max_file_size: options.max_file_size.as_bytes(),
1927    }))
1928}
1929
1930fn plan_copy_from(
1931    scx: &StatementContext,
1932    target: &CopyTarget<Aug>,
1933    table_name: ResolvedItemName,
1934    columns: Vec<Ident>,
1935    format: CopyFormat,
1936    options: CopyOptionExtracted,
1937) -> Result<Plan, PlanError> {
1938    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1939        match option {
1940            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1941            None => Ok(()),
1942        }
1943    }
1944
1945    let source = match target {
1946        CopyTarget::Stdin => CopyFromSource::Stdin,
1947        CopyTarget::Expr(from) => {
1948            // Converting the expr to an HirScalarExpr
1949            let mut from_expr = from.clone();
1950            transform_ast::transform(scx, &mut from_expr)?;
1951            let relation_type = RelationDesc::empty();
1952            let ecx = &ExprContext {
1953                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1954                name: "COPY FROM target",
1955                scope: &Scope::empty(),
1956                relation_type: relation_type.typ(),
1957                allow_aggregates: false,
1958                allow_subqueries: false,
1959                allow_parameters: false,
1960                allow_windows: false,
1961            };
1962            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1963
1964            match options.aws_connection {
1965                Some(conn_id) => {
1966                    let conn_id = CatalogItemId::from(conn_id);
1967
1968                    // Validate the connection type is one we expect.
1969                    let connection = match scx.get_item(&conn_id).connection()? {
1970                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1971                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1972                    };
1973
1974                    CopyFromSource::AwsS3 {
1975                        uri: from,
1976                        connection,
1977                        connection_id: conn_id,
1978                    }
1979                }
1980                None => CopyFromSource::Url(from),
1981            }
1982        }
1983        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1984    };
1985
1986    let params = match format {
1987        CopyFormat::Text => {
1988            only_available_with_csv(options.quote, "quote")?;
1989            only_available_with_csv(options.escape, "escape")?;
1990            only_available_with_csv(options.header, "HEADER")?;
1991            let delimiter =
1992                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1993            let null = match options.null {
1994                Some(null) => Cow::from(null),
1995                None => Cow::from("\\N"),
1996            };
1997            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1998        }
1999        CopyFormat::Csv => {
2000            let quote = extract_byte_param_value(options.quote, "quote")?;
2001            let escape = extract_byte_param_value(options.escape, "escape")?;
2002            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2003            CopyFormatParams::Csv(
2004                CopyCsvFormatParams::try_new(
2005                    delimiter,
2006                    quote,
2007                    escape,
2008                    options.header,
2009                    options.null,
2010                )
2011                .map_err(|e| sql_err!("{}", e))?,
2012            )
2013        }
2014        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2015        CopyFormat::Parquet => CopyFormatParams::Parquet,
2016    };
2017
2018    let filter = match (options.files, options.pattern) {
2019        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2020        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2021        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2022        (None, None) => None,
2023    };
2024
2025    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2026        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2027    }
2028
2029    let table_name_string = table_name.full_name_str();
2030
2031    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2032
2033    let Some(mfp) = maybe_mfp else {
2034        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2035    };
2036
2037    Ok(Plan::CopyFrom(CopyFromPlan {
2038        target_id: id,
2039        target_name: table_name_string,
2040        source,
2041        columns,
2042        source_desc,
2043        mfp,
2044        params,
2045        filter,
2046    }))
2047}
2048
2049fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2050    match v {
2051        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2052        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2053        None => Ok(None),
2054    }
2055}
2056
2057generate_extracted_config!(
2058    CopyOption,
2059    (Format, String),
2060    (Delimiter, String),
2061    (Null, String),
2062    (Escape, String),
2063    (Quote, String),
2064    (Header, bool),
2065    (AwsConnection, with_options::Object),
2066    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2067    (Files, Vec<String>),
2068    (Pattern, String)
2069);
2070
2071pub fn plan_copy(
2072    scx: &StatementContext,
2073    CopyStatement {
2074        relation,
2075        direction,
2076        target,
2077        options,
2078    }: CopyStatement<Aug>,
2079) -> Result<Plan, PlanError> {
2080    let options = CopyOptionExtracted::try_from(options)?;
2081    // Parse any user-provided FORMAT option. If not provided, will default to
2082    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2083    let format = options
2084        .format
2085        .as_ref()
2086        .map(|format| match format.to_lowercase().as_str() {
2087            "text" => Ok(CopyFormat::Text),
2088            "csv" => Ok(CopyFormat::Csv),
2089            "binary" => Ok(CopyFormat::Binary),
2090            "parquet" => Ok(CopyFormat::Parquet),
2091            _ => sql_bail!("unknown FORMAT: {}", format),
2092        })
2093        .transpose()?;
2094
2095    match (&direction, &target) {
2096        (CopyDirection::To, CopyTarget::Stdout) => {
2097            if options.delimiter.is_some() {
2098                sql_bail!("COPY TO does not support DELIMITER option yet");
2099            }
2100            if options.quote.is_some() {
2101                sql_bail!("COPY TO does not support QUOTE option yet");
2102            }
2103            if options.null.is_some() {
2104                sql_bail!("COPY TO does not support NULL option yet");
2105            }
2106            match relation {
2107                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2108                CopyRelation::Select(stmt) => Ok(plan_select(
2109                    scx,
2110                    stmt,
2111                    &Params::empty(),
2112                    Some(format.unwrap_or(CopyFormat::Text)),
2113                )?),
2114                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2115                    scx,
2116                    stmt,
2117                    &Params::empty(),
2118                    Some(format.unwrap_or(CopyFormat::Text)),
2119                )?),
2120            }
2121        }
2122        (CopyDirection::From, target) => match relation {
2123            CopyRelation::Named { name, columns } => plan_copy_from(
2124                scx,
2125                target,
2126                name,
2127                columns,
2128                format.unwrap_or(CopyFormat::Text),
2129                options,
2130            ),
2131            _ => sql_bail!("COPY FROM {} not supported", target),
2132        },
2133        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2134            let format = match format {
2135                Some(inner) => inner,
2136                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2137            };
2138
2139            let stmt = match relation {
2140                CopyRelation::Named { name, columns } => {
2141                    if !columns.is_empty() {
2142                        // TODO(mouli): Add support for this
2143                        sql_bail!(
2144                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2145                        );
2146                    }
2147                    // Generate a synthetic SELECT query that just gets the table
2148                    let query = Query {
2149                        ctes: CteBlock::empty(),
2150                        body: SetExpr::Table(name),
2151                        order_by: vec![],
2152                        limit: None,
2153                        offset: None,
2154                    };
2155                    SelectStatement { query, as_of: None }
2156                }
2157                CopyRelation::Select(stmt) => {
2158                    if !stmt.query.order_by.is_empty() {
2159                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2160                    }
2161                    stmt
2162                }
2163                CopyRelation::Subscribe(_) => {
2164                    sql_bail!("COPY {} {} not supported", direction, target)
2165                }
2166            };
2167
2168            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2169            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2170        }
2171        _ => sql_bail!("COPY {} {} not supported", direction, target),
2172    }
2173}