mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37    SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50    UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
73// plans the whole query to determine its shape and parameter types,
74// and then throws away that plan. If we were smarter, we'd stash that
75// plan somewhere so we don't have to recompute it when the query is
76// executed.
77
78pub fn describe_insert(
79    scx: &StatementContext,
80    InsertStatement {
81        table_name,
82        columns,
83        source,
84        returning,
85    }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88    let desc = if returning.expr.is_empty() {
89        None
90    } else {
91        Some(returning.desc)
92    };
93    Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97    scx: &StatementContext,
98    InsertStatement {
99        table_name,
100        columns,
101        source,
102        returning,
103    }: InsertStatement<Aug>,
104    params: &Params,
105) -> Result<Plan, PlanError> {
106    let (id, mut expr, returning) =
107        query::plan_insert_query(scx, table_name, columns, source, returning)?;
108    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109    let returning = returning
110        .expr
111        .into_iter()
112        .map(|mut expr| {
113            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114            expr.lower_uncorrelated()
115        })
116        .collect::<Result<Vec<_>, _>>()?;
117
118    Ok(Plan::Insert(InsertPlan {
119        id,
120        values: expr,
121        returning,
122    }))
123}
124
125pub fn describe_delete(
126    scx: &StatementContext,
127    stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129    query::plan_delete_query(scx, stmt)?;
130    Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134    scx: &StatementContext,
135    stmt: DeleteStatement<Aug>,
136    params: &Params,
137) -> Result<Plan, PlanError> {
138    let rtw_plan = query::plan_delete_query(scx, stmt)?;
139    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143    scx: &StatementContext,
144    stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146    query::plan_update_query(scx, stmt)?;
147    Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151    scx: &StatementContext,
152    stmt: UpdateStatement<Aug>,
153    params: &Params,
154) -> Result<Plan, PlanError> {
155    let rtw_plan = query::plan_update_query(scx, stmt)?;
156    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160    scx: &StatementContext,
161    kind: MutationKind,
162    params: &Params,
163    query::ReadThenWritePlan {
164        id,
165        mut selection,
166        finishing,
167        assignments,
168    }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171    let mut assignments_outer = BTreeMap::new();
172    for (idx, mut set) in assignments {
173        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174        let set = set.lower_uncorrelated()?;
175        assignments_outer.insert(idx, set);
176    }
177
178    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179        id,
180        selection,
181        finishing,
182        assignments: assignments_outer,
183        kind,
184        returning: Vec::new(),
185    }))
186}
187
188pub fn describe_select(
189    scx: &StatementContext,
190    stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193        return Ok(StatementDesc::new(Some(desc)));
194    }
195
196    let query::PlannedRootQuery { desc, .. } =
197        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198    Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202    scx: &StatementContext,
203    select: SelectStatement<Aug>,
204    params: &Params,
205    copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208        return Ok(Plan::SideEffectingFunc(f));
209    }
210
211    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212    Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216    scx: &StatementContext,
217    select: SelectStatement<Aug>,
218    params: &Params,
219    copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221    let when = query::plan_as_of(scx, select.as_of.clone())?;
222    let lifetime = QueryLifetime::OneShot;
223    let query::PlannedRootQuery {
224        mut expr,
225        desc,
226        finishing,
227        scope: _,
228    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229    expr.bind_parameters(scx, lifetime, params)?;
230
231    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
232    // Let's check this and simplify them to literals.
233    expr.try_visit_mut_pre(&mut |expr| {
234        if let HirRelationExpr::TopK { offset, .. } = expr {
235            let offset_value = offset_into_value(offset.take())?;
236            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237        }
238        Ok::<(), PlanError>(())
239    })?;
240    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
241    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
242    // so later.)
243
244    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
245    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
246    // This involves binding parameters and evaluating each expression to a number.
247    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
248    // so this `limit` has to be a constant.)
249    let limit = match finishing.limit {
250        None => None,
251        Some(mut limit) => {
252            limit.bind_parameters(scx, lifetime, params)?;
253            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
254            let Some(limit) = limit.as_literal() else {
255                sql_bail!(
256                    "Top-level LIMIT must be a constant expression, got {}",
257                    limit
258                )
259            };
260            match limit {
261                Datum::Null => None,
262                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263                _ => {
264                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265                    sql_bail!("LIMIT must be a non-negative INT or NULL")
266                }
267            }
268        }
269    };
270    let offset = {
271        let mut offset = finishing.offset.clone();
272        offset.bind_parameters(scx, lifetime, params)?;
273        let offset = offset_into_value(offset.take())?;
274        offset
275            .try_into()
276            .expect("checked in offset_into_value that it is not negative")
277    };
278
279    let plan = SelectPlan {
280        source: expr,
281        when,
282        finishing: RowSetFinishing {
283            limit,
284            offset,
285            project: finishing.project,
286            order_by: finishing.order_by,
287        },
288        copy_to,
289        select: Some(Box::new(select)),
290    };
291
292    Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296    scx: &StatementContext,
297    explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299    let mut relation_desc = RelationDesc::builder();
300
301    match explain.stage() {
302        ExplainStage::RawPlan => {
303            let name = "Raw Plan";
304            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305        }
306        ExplainStage::DecorrelatedPlan => {
307            let name = "Decorrelated Plan";
308            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309        }
310        ExplainStage::LocalPlan => {
311            let name = "Locally Optimized Plan";
312            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313        }
314        ExplainStage::GlobalPlan => {
315            let name = "Optimized Plan";
316            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317        }
318        ExplainStage::PhysicalPlan => {
319            let name = "Physical Plan";
320            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321        }
322        ExplainStage::Trace => {
323            relation_desc = relation_desc
324                .with_column("Time", SqlScalarType::UInt64.nullable(false))
325                .with_column("Path", SqlScalarType::String.nullable(false))
326                .with_column("Plan", SqlScalarType::String.nullable(false));
327        }
328        ExplainStage::PlanInsights => {
329            let name = "Plan Insights";
330            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331        }
332    };
333    let relation_desc = relation_desc.finish();
334
335    Ok(
336        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338            _ => vec![],
339        }),
340    )
341}
342
343pub fn describe_explain_pushdown(
344    scx: &StatementContext,
345    statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347    let relation_desc = RelationDesc::builder()
348        .with_column("Source", SqlScalarType::String.nullable(false))
349        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353        .finish();
354
355    Ok(
356        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358            _ => vec![],
359        }),
360    )
361}
362
363pub fn describe_explain_analyze_object(
364    _scx: &StatementContext,
365    statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367    if statement.as_sql {
368        let relation_desc = RelationDesc::builder()
369            .with_column("SQL", SqlScalarType::String.nullable(false))
370            .finish();
371        return Ok(StatementDesc::new(Some(relation_desc)));
372    }
373
374    match statement.properties {
375        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376            properties,
377            skew,
378        }) => {
379            let mut relation_desc = RelationDesc::builder()
380                .with_column("operator", SqlScalarType::String.nullable(false));
381
382            if skew {
383                relation_desc =
384                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385            }
386
387            let mut seen_properties = BTreeSet::new();
388            for property in properties {
389                // handle each property only once (belt and suspenders)
390                if !seen_properties.insert(property) {
391                    continue;
392                }
393
394                match property {
395                    ExplainAnalyzeComputationProperty::Memory if skew => {
396                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397                        relation_desc = relation_desc
398                            .with_column("memory_ratio", numeric.clone())
399                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
400                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
401                            .with_column("total_memory", SqlScalarType::String.nullable(true))
402                            .with_column("records_ratio", numeric.clone())
403                            .with_column("worker_records", numeric.clone())
404                            .with_column("avg_records", numeric.clone())
405                            .with_column("total_records", numeric);
406                    }
407                    ExplainAnalyzeComputationProperty::Memory => {
408                        relation_desc = relation_desc
409                            .with_column("total_memory", SqlScalarType::String.nullable(true))
410                            .with_column(
411                                "total_records",
412                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
413                            );
414                    }
415                    ExplainAnalyzeComputationProperty::Cpu => {
416                        if skew {
417                            relation_desc = relation_desc
418                                .with_column(
419                                    "cpu_ratio",
420                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
421                                )
422                                .with_column(
423                                    "worker_elapsed",
424                                    SqlScalarType::Interval.nullable(true),
425                                )
426                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427                        }
428                        relation_desc = relation_desc
429                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430                    }
431                }
432            }
433
434            let relation_desc = relation_desc.finish();
435            Ok(StatementDesc::new(Some(relation_desc)))
436        }
437        ExplainAnalyzeProperty::Hints => {
438            let relation_desc = RelationDesc::builder()
439                .with_column("operator", SqlScalarType::String.nullable(true))
440                .with_column("levels", SqlScalarType::Int64.nullable(true))
441                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442                .with_column("hint", SqlScalarType::Float64.nullable(true))
443                .with_column("savings", SqlScalarType::String.nullable(true))
444                .finish();
445            Ok(StatementDesc::new(Some(relation_desc)))
446        }
447    }
448}
449
450pub fn describe_explain_analyze_cluster(
451    _scx: &StatementContext,
452    statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454    if statement.as_sql {
455        let relation_desc = RelationDesc::builder()
456            .with_column("SQL", SqlScalarType::String.nullable(false))
457            .finish();
458        return Ok(StatementDesc::new(Some(relation_desc)));
459    }
460
461    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463    let mut relation_desc = RelationDesc::builder()
464        .with_column("object", SqlScalarType::String.nullable(false))
465        .with_column("global_id", SqlScalarType::String.nullable(false));
466
467    if skew {
468        relation_desc =
469            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470    }
471
472    let mut seen_properties = BTreeSet::new();
473    for property in properties {
474        // handle each property only once (belt and suspenders)
475        if !seen_properties.insert(property) {
476            continue;
477        }
478
479        match property {
480            ExplainAnalyzeComputationProperty::Memory if skew => {
481                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482                relation_desc = relation_desc
483                    .with_column("max_operator_memory_ratio", numeric.clone())
484                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
485                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
486                    .with_column("total_memory", SqlScalarType::String.nullable(true))
487                    .with_column("max_operator_records_ratio", numeric.clone())
488                    .with_column("worker_records", numeric.clone())
489                    .with_column("avg_records", numeric.clone())
490                    .with_column("total_records", numeric);
491            }
492            ExplainAnalyzeComputationProperty::Memory => {
493                relation_desc = relation_desc
494                    .with_column("total_memory", SqlScalarType::String.nullable(true))
495                    .with_column(
496                        "total_records",
497                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
498                    );
499            }
500            ExplainAnalyzeComputationProperty::Cpu if skew => {
501                relation_desc = relation_desc
502                    .with_column(
503                        "max_operator_cpu_ratio",
504                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
505                    )
506                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509            }
510            ExplainAnalyzeComputationProperty::Cpu => {
511                relation_desc = relation_desc
512                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513            }
514        }
515    }
516
517    Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521    scx: &StatementContext,
522    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524    let relation_desc = RelationDesc::builder()
525        .with_column("Timestamp", SqlScalarType::String.nullable(false))
526        .finish();
527
528    Ok(StatementDesc::new(Some(relation_desc))
529        .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533    _: &StatementContext,
534    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536    let relation_desc = RelationDesc::builder()
537        .with_column("Schema", SqlScalarType::String.nullable(false))
538        .finish();
539    Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
543// `bool`:
544// - When it's an override of a global feature flag, for example optimizer feature flags. In this
545//   case, we need not just false and true, but also None to say "take the value of the global
546//   flag".
547// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
548//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
549//   but otherwise we'd like to default to false.
550generate_extracted_config!(
551    ExplainPlanOption,
552    (Arity, Option<bool>, Default(None)),
553    (Cardinality, bool, Default(false)),
554    (ColumnNames, bool, Default(false)),
555    (FilterPushdown, Option<bool>, Default(None)),
556    (HumanizedExpressions, Option<bool>, Default(None)),
557    (JoinImplementations, bool, Default(false)),
558    (Keys, bool, Default(false)),
559    (LinearChains, bool, Default(false)),
560    (NoFastPath, bool, Default(false)),
561    (NonNegative, bool, Default(false)),
562    (NoNotices, bool, Default(false)),
563    (NodeIdentifiers, bool, Default(false)),
564    (Raw, bool, Default(false)),
565    (RawPlans, bool, Default(false)),
566    (RawSyntax, bool, Default(false)),
567    (Redacted, bool, Default(false)),
568    (SubtreeSize, bool, Default(false)),
569    (Timing, bool, Default(false)),
570    (Types, bool, Default(false)),
571    (Equivalences, bool, Default(false)),
572    (ReoptimizeImportedViews, Option<bool>, Default(None)),
573    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578    (
579        EnableProjectionPushdownAfterRelationCse,
580        Option<bool>,
581        Default(None)
582    )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586    type Error = PlanError;
587
588    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589        // If `WITH(raw)` is specified, ensure that the config will be as
590        // representative for the original plan as possible.
591        if v.raw {
592            v.raw_plans = true;
593            v.raw_syntax = true;
594        }
595
596        // Certain config should default to be enabled in release builds running on
597        // staging or prod (where SOFT_ASSERTIONS are turned off).
598        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600        Ok(ExplainConfig {
601            arity: v.arity.unwrap_or(enable_on_prod),
602            cardinality: v.cardinality,
603            column_names: v.column_names,
604            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606            join_impls: v.join_implementations,
607            keys: v.keys,
608            linear_chains: !v.raw_plans && v.linear_chains,
609            no_fast_path: v.no_fast_path,
610            no_notices: v.no_notices,
611            node_ids: v.node_identifiers,
612            non_negative: v.non_negative,
613            raw_plans: v.raw_plans,
614            raw_syntax: v.raw_syntax,
615            verbose_syntax: false,
616            redacted: v.redacted,
617            subtree_size: v.subtree_size,
618            equivalences: v.equivalences,
619            timing: v.timing,
620            types: v.types,
621            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
622            features: OptimizerFeatureOverrides {
623                enable_guard_subquery_tablefunc: Default::default(),
624                enable_eager_delta_joins: v.enable_eager_delta_joins,
625                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628                enable_consolidate_after_union_negate: Default::default(),
629                enable_reduce_mfp_fusion: Default::default(),
630                enable_cardinality_estimates: Default::default(),
631                persist_fast_path_limit: Default::default(),
632                reoptimize_imported_views: v.reoptimize_imported_views,
633                enable_reduce_reduction: Default::default(),
634                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
635                enable_projection_pushdown_after_relation_cse: v
636                    .enable_projection_pushdown_after_relation_cse,
637                enable_less_reduce_in_eqprop: Default::default(),
638                enable_dequadratic_eqprop_map: Default::default(),
639                enable_eq_classes_withholding_errors: Default::default(),
640                enable_fast_path_plan_insights: Default::default(),
641            },
642        })
643    }
644}
645
646fn plan_explainee(
647    scx: &StatementContext,
648    explainee: Explainee<Aug>,
649    params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651    use crate::plan::ExplaineeStatement;
652
653    let is_replan = matches!(
654        explainee,
655        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656    );
657
658    let explainee = match explainee {
659        Explainee::View(name) | Explainee::ReplanView(name) => {
660            let item = scx.get_item_by_resolved_name(&name)?;
661            let item_type = item.item_type();
662            if item_type != CatalogItemType::View {
663                sql_bail!("Expected {name} to be a view, not a {item_type}");
664            }
665            match is_replan {
666                true => crate::plan::Explainee::ReplanView(item.id()),
667                false => crate::plan::Explainee::View(item.id()),
668            }
669        }
670        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671            let item = scx.get_item_by_resolved_name(&name)?;
672            let item_type = item.item_type();
673            if item_type != CatalogItemType::MaterializedView {
674                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675            }
676            match is_replan {
677                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678                false => crate::plan::Explainee::MaterializedView(item.id()),
679            }
680        }
681        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682            let item = scx.get_item_by_resolved_name(&name)?;
683            let item_type = item.item_type();
684            if item_type != CatalogItemType::Index {
685                sql_bail!("Expected {name} to be an index, not a {item_type}");
686            }
687            match is_replan {
688                true => crate::plan::Explainee::ReplanIndex(item.id()),
689                false => crate::plan::Explainee::Index(item.id()),
690            }
691        }
692        Explainee::Select(select, broken) => {
693            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695        }
696        Explainee::CreateView(mut stmt, broken) => {
697            if stmt.if_exists != IfExistsBehavior::Skip {
698                // If we don't force this parameter to Skip planning will
699                // fail for names that already exist in the catalog. This
700                // can happen even in `Replace` mode if the existing item
701                // has dependencies.
702                stmt.if_exists = IfExistsBehavior::Skip;
703            } else {
704                sql_bail!(
705                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
707                );
708            }
709
710            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711                sql_bail!("expected CreateViewPlan plan");
712            };
713
714            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715        }
716        Explainee::CreateMaterializedView(mut stmt, broken) => {
717            if stmt.if_exists != IfExistsBehavior::Skip {
718                // If we don't force this parameter to Skip planning will
719                // fail for names that already exist in the catalog. This
720                // can happen even in `Replace` mode if the existing item
721                // has dependencies.
722                stmt.if_exists = IfExistsBehavior::Skip;
723            } else {
724                sql_bail!(
725                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
727                );
728            }
729
730            let Plan::CreateMaterializedView(plan) =
731                ddl::plan_create_materialized_view(scx, *stmt)?
732            else {
733                sql_bail!("expected CreateMaterializedViewPlan plan");
734            };
735
736            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737                broken,
738                plan,
739            })
740        }
741        Explainee::CreateIndex(mut stmt, broken) => {
742            if !stmt.if_not_exists {
743                // If we don't force this parameter to true planning will
744                // fail for index items that already exist in the catalog.
745                stmt.if_not_exists = true;
746            } else {
747                sql_bail!(
748                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
750                );
751            }
752
753            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754                sql_bail!("expected CreateIndexPlan plan");
755            };
756
757            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758        }
759    };
760
761    Ok(explainee)
762}
763
764pub fn plan_explain_plan(
765    scx: &StatementContext,
766    explain: ExplainPlanStatement<Aug>,
767    params: &Params,
768) -> Result<Plan, PlanError> {
769    let (format, verbose_syntax) = match explain.format() {
770        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
771        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
772        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
773        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
774    };
775    let stage = explain.stage();
776
777    // Plan ExplainConfig.
778    let mut config = {
779        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
780
781        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
782            // If filtering is disabled, explain plans should not include pushdown info.
783            with_options.filter_pushdown = Some(false);
784        }
785
786        ExplainConfig::try_from(with_options)?
787    };
788    config.verbose_syntax = verbose_syntax;
789
790    let explainee = plan_explainee(scx, explain.explainee, params)?;
791
792    Ok(Plan::ExplainPlan(ExplainPlanPlan {
793        stage,
794        format,
795        config,
796        explainee,
797    }))
798}
799
800pub fn plan_explain_schema(
801    scx: &StatementContext,
802    explain_schema: ExplainSinkSchemaStatement<Aug>,
803) -> Result<Plan, PlanError> {
804    let ExplainSinkSchemaStatement {
805        schema_for,
806        // Parser limits to JSON.
807        format: _,
808        mut statement,
809    } = explain_schema;
810
811    // Force the sink's name to one that's guaranteed not to exist, by virtue of
812    // being a non-existent item in a schema under the system's control, so that
813    // `plan_create_sink` doesn't complain about the name already existing.
814    statement.name = Some(UnresolvedItemName::qualified(&[
815        ident!("mz_catalog"),
816        ident!("mz_explain_schema"),
817    ]));
818
819    crate::pure::purify_create_sink_avro_doc_on_options(
820        scx.catalog,
821        *statement.from.item_id(),
822        &mut statement.format,
823    )?;
824
825    match ddl::plan_create_sink(scx, statement)? {
826        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
827            StorageSinkConnection::Kafka(KafkaSinkConnection {
828                format:
829                    KafkaSinkFormat {
830                        key_format,
831                        value_format:
832                            KafkaSinkFormatType::Avro {
833                                schema: value_schema,
834                                ..
835                            },
836                        ..
837                    },
838                ..
839            }) => {
840                let schema = match schema_for {
841                    ExplainSinkSchemaFor::Key => key_format
842                        .and_then(|f| match f {
843                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
844                            _ => None,
845                        })
846                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
847                    ExplainSinkSchemaFor::Value => value_schema,
848                };
849
850                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
851                    sink_from: sink.from,
852                    json_schema: schema,
853                }))
854            }
855            _ => bail_unsupported!(
856                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
857            ),
858        },
859        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
860    }
861}
862
863pub fn plan_explain_pushdown(
864    scx: &StatementContext,
865    statement: ExplainPushdownStatement<Aug>,
866    params: &Params,
867) -> Result<Plan, PlanError> {
868    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
869    let explainee = plan_explainee(scx, statement.explainee, params)?;
870    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
871}
872
873pub fn plan_explain_analyze_object(
874    scx: &StatementContext,
875    statement: ExplainAnalyzeObjectStatement<Aug>,
876    params: &Params,
877) -> Result<Plan, PlanError> {
878    let explainee_name = statement
879        .explainee
880        .name()
881        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
882        .full_name_str();
883    let explainee = plan_explainee(scx, statement.explainee, params)?;
884
885    match explainee {
886        plan::Explainee::Index(_index_id) => (),
887        plan::Explainee::MaterializedView(_item_id) => (),
888        _ => {
889            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
890        }
891    };
892
893    // generate SQL query
894
895    /* WITH {CTEs}
896       SELECT REPEAT(' ', nesting * 2) || operator AS operator
897             {columns}
898        FROM      mz_introspection.mz_lir_mapping mlm
899             JOIN {from} USING (lir_id)
900             JOIN mz_introspection.mz_mappable_objects mo
901               ON (mlm.global_id = mo.global_id)
902       WHERE     mo.name = '{plan.explainee_name}'
903             AND {predicates}
904       ORDER BY lir_id DESC
905    */
906    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
907    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
908    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
909    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
910    let mut order_by = vec!["mlm.lir_id DESC"];
911
912    match statement.properties {
913        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
914            properties,
915            skew,
916        }) => {
917            let mut worker_id = None;
918            let mut seen_properties = BTreeSet::new();
919            for property in properties {
920                // handle each property only once (belt and suspenders)
921                if !seen_properties.insert(property) {
922                    continue;
923                }
924
925                match property {
926                    ExplainAnalyzeComputationProperty::Memory => {
927                        ctes.push((
928                            "summary_memory",
929                            r#"
930  SELECT mlm.global_id AS global_id,
931         mlm.lir_id AS lir_id,
932         SUM(mas.size) AS total_memory,
933         SUM(mas.records) AS total_records,
934         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
935         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
936    FROM            mz_introspection.mz_lir_mapping mlm
937         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
938               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
939                 ON (mas.operator_id = valid_id)
940GROUP BY mlm.global_id, mlm.lir_id"#,
941                        ));
942                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
943
944                        if skew {
945                            ctes.push((
946                                "per_worker_memory",
947                                r#"
948  SELECT mlm.global_id AS global_id,
949         mlm.lir_id AS lir_id,
950         mas.worker_id AS worker_id,
951         SUM(mas.size) AS worker_memory,
952         SUM(mas.records) AS worker_records
953    FROM            mz_introspection.mz_lir_mapping mlm
954         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
955               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
956                 ON (mas.operator_id = valid_id)
957GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
958                            ));
959                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
960
961                            if let Some(worker_id) = worker_id {
962                                predicates.push(format!("pwm.worker_id = {worker_id}"));
963                            } else {
964                                worker_id = Some("pwm.worker_id");
965                                columns.push("pwm.worker_id AS worker_id");
966                                order_by.push("worker_id");
967                            }
968
969                            columns.extend([
970                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
971                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
972                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
973                                "pg_size_pretty(sm.total_memory) AS total_memory",
974                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
975                                "pwm.worker_records AS worker_records",
976                                "sm.avg_records AS avg_records",
977                                "sm.total_records AS total_records",
978                            ]);
979                        } else {
980                            columns.extend([
981                                "pg_size_pretty(sm.total_memory) AS total_memory",
982                                "sm.total_records AS total_records",
983                            ]);
984                        }
985                    }
986                    ExplainAnalyzeComputationProperty::Cpu => {
987                        ctes.push((
988                            "summary_cpu",
989                            r#"
990  SELECT mlm.global_id AS global_id,
991         mlm.lir_id AS lir_id,
992         SUM(mse.elapsed_ns) AS total_ns,
993         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
994    FROM            mz_introspection.mz_lir_mapping mlm
995         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
996               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
997                 ON (mse.id = valid_id)
998GROUP BY mlm.global_id, mlm.lir_id"#,
999                        ));
1000                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1001
1002                        if skew {
1003                            ctes.push((
1004                                "per_worker_cpu",
1005                                r#"
1006  SELECT mlm.global_id AS global_id,
1007         mlm.lir_id AS lir_id,
1008         mse.worker_id AS worker_id,
1009         SUM(mse.elapsed_ns) AS worker_ns
1010    FROM            mz_introspection.mz_lir_mapping mlm
1011         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1012               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1013                 ON (mse.id = valid_id)
1014GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1015                            ));
1016                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1017
1018                            if let Some(worker_id) = worker_id {
1019                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1020                            } else {
1021                                worker_id = Some("pwc.worker_id");
1022                                columns.push("pwc.worker_id AS worker_id");
1023                                order_by.push("worker_id");
1024                            }
1025
1026                            columns.extend([
1027                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1028                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1029                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1030                            ]);
1031                        }
1032                        columns.push(
1033                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1034                        );
1035                    }
1036                }
1037            }
1038        }
1039        ExplainAnalyzeProperty::Hints => {
1040            columns.extend([
1041                "megsa.levels AS levels",
1042                "megsa.to_cut AS to_cut",
1043                "megsa.hint AS hint",
1044                "pg_size_pretty(megsa.savings) AS savings",
1045            ]);
1046            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1047            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1048             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1049        }
1050    }
1051
1052    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1053
1054    let ctes = if !ctes.is_empty() {
1055        format!(
1056            "WITH {}",
1057            separated(
1058                ",\n",
1059                ctes.iter()
1060                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1061            )
1062        )
1063    } else {
1064        String::new()
1065    };
1066    let columns = separated(", ", columns);
1067    let from = separated(" ", from);
1068    let predicates = separated(" AND ", predicates);
1069    let order_by = separated(", ", order_by);
1070    let query = format!(
1071        r#"{ctes}
1072SELECT {columns}
1073FROM {from}
1074WHERE {predicates}
1075ORDER BY {order_by}"#
1076    );
1077
1078    if statement.as_sql {
1079        let rows = vec![Row::pack_slice(&[Datum::String(
1080            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1081                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1082            })?,
1083        )])];
1084        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1085
1086        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1087    } else {
1088        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1089        show_select.plan()
1090    }
1091}
1092
1093pub fn plan_explain_analyze_cluster(
1094    scx: &StatementContext,
1095    statement: ExplainAnalyzeClusterStatement,
1096    _params: &Params,
1097) -> Result<Plan, PlanError> {
1098    // object string
1099    // worker_id uint64        (if           skew)
1100    // memory_ratio numeric    (if memory && skew)
1101    // worker_memory string    (if memory && skew)
1102    // avg_memory string       (if memory && skew)
1103    // total_memory string     (if memory)
1104    // records_ratio numeric   (if memory && skew)
1105    // worker_records          (if memory && skew)
1106    // avg_records numeric     (if memory && skew)
1107    // total_records numeric   (if memory)
1108    // cpu_ratio numeric       (if cpu    && skew)
1109    // worker_elapsed interval (if cpu    && skew)
1110    // avg_elapsed interval    (if cpu    && skew)
1111    // total_elapsed interval  (if cpu)
1112
1113    /* WITH {CTEs}
1114       SELECT mo.name AS object
1115             {columns}
1116        FROM mz_introspection.mz_mappable_objects mo
1117             {from}
1118       WHERE {predicates}
1119       ORDER BY {order_by}, mo.name DESC
1120    */
1121    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1122    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1123    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1124    let mut predicates = vec![];
1125    let mut order_by = vec![];
1126
1127    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1128    let mut worker_id = None;
1129    let mut seen_properties = BTreeSet::new();
1130    for property in properties {
1131        // handle each property only once (belt and suspenders)
1132        if !seen_properties.insert(property) {
1133            continue;
1134        }
1135
1136        match property {
1137            ExplainAnalyzeComputationProperty::Memory => {
1138                if skew {
1139                    let mut set_worker_id = false;
1140                    if let Some(worker_id) = worker_id {
1141                        // join condition if we're showing skew for more than one property
1142                        predicates.push(format!("om.worker_id = {worker_id}"));
1143                    } else {
1144                        worker_id = Some("om.worker_id");
1145                        columns.push("om.worker_id AS worker_id");
1146                        set_worker_id = true; // we'll add ourselves to `order_by` later
1147                    };
1148
1149                    // computes the average memory per LIR operator (for per operator ratios)
1150                    ctes.push((
1151                    "per_operator_memory_summary",
1152                    r#"
1153SELECT mlm.global_id AS global_id,
1154       mlm.lir_id AS lir_id,
1155       SUM(mas.size) AS total_memory,
1156       SUM(mas.records) AS total_records,
1157       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1158       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1159FROM        mz_introspection.mz_lir_mapping mlm
1160 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1161       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1162         ON (mas.operator_id = valid_id)
1163GROUP BY mlm.global_id, mlm.lir_id"#,
1164                ));
1165
1166                    // computes the memory per worker in a per operator way
1167                    ctes.push((
1168                    "per_operator_memory_per_worker",
1169                    r#"
1170SELECT mlm.global_id AS global_id,
1171       mlm.lir_id AS lir_id,
1172       mas.worker_id AS worker_id,
1173       SUM(mas.size) AS worker_memory,
1174       SUM(mas.records) AS worker_records
1175FROM        mz_introspection.mz_lir_mapping mlm
1176 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1177       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1178         ON (mas.operator_id = valid_id)
1179GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1180                    ));
1181
1182                    // computes memory ratios per worker per operator
1183                    ctes.push((
1184                    "per_operator_memory_ratios",
1185                    r#"
1186SELECT pompw.global_id AS global_id,
1187       pompw.lir_id AS lir_id,
1188       pompw.worker_id AS worker_id,
1189       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1190       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1191  FROM      per_operator_memory_per_worker pompw
1192       JOIN per_operator_memory_summary poms
1193         USING (global_id, lir_id)
1194"#,
1195                    ));
1196
1197                    // summarizes each object, per worker
1198                    ctes.push((
1199                        "object_memory",
1200                        r#"
1201SELECT pompw.global_id AS global_id,
1202       pompw.worker_id AS worker_id,
1203       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1204       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1205       SUM(pompw.worker_memory) AS worker_memory,
1206       SUM(pompw.worker_records) AS worker_records
1207FROM        per_operator_memory_per_worker pompw
1208     JOIN   per_operator_memory_ratios pomr
1209     USING (global_id, worker_id, lir_id)
1210GROUP BY pompw.global_id, pompw.worker_id
1211"#,
1212                    ));
1213
1214                    // summarizes each worker
1215                    ctes.push(("object_average_memory", r#"
1216SELECT om.global_id AS global_id,
1217       SUM(om.worker_memory) AS total_memory,
1218       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1219       SUM(om.worker_records) AS total_records,
1220       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1221  FROM object_memory om
1222GROUP BY om.global_id"#));
1223
1224                    from.push("LEFT JOIN object_memory om USING (global_id)");
1225                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1226
1227                    columns.extend([
1228                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1229                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1230                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1231                        "pg_size_pretty(oam.total_memory) AS total_memory",
1232                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1233                        "om.worker_records AS worker_records",
1234                        "oam.avg_records AS avg_records",
1235                        "oam.total_records AS total_records",
1236                    ]);
1237
1238                    order_by.extend([
1239                        "max_operator_memory_ratio DESC",
1240                        "max_operator_records_ratio DESC",
1241                        "worker_memory DESC",
1242                        "worker_records DESC",
1243                    ]);
1244
1245                    if set_worker_id {
1246                        order_by.push("worker_id");
1247                    }
1248                } else {
1249                    // no skew, so just compute totals
1250                    ctes.push((
1251                        "per_operator_memory_totals",
1252                        r#"
1253    SELECT mlm.global_id AS global_id,
1254           mlm.lir_id AS lir_id,
1255           SUM(mas.size) AS total_memory,
1256           SUM(mas.records) AS total_records
1257    FROM        mz_introspection.mz_lir_mapping mlm
1258     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1259           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1260             ON (mas.operator_id = valid_id)
1261    GROUP BY mlm.global_id, mlm.lir_id"#,
1262                    ));
1263
1264                    ctes.push((
1265                        "object_memory_totals",
1266                        r#"
1267SELECT pomt.global_id AS global_id,
1268       SUM(pomt.total_memory) AS total_memory,
1269       SUM(pomt.total_records) AS total_records
1270FROM per_operator_memory_totals pomt
1271GROUP BY pomt.global_id
1272"#,
1273                    ));
1274
1275                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1276                    columns.extend([
1277                        "pg_size_pretty(omt.total_memory) AS total_memory",
1278                        "omt.total_records AS total_records",
1279                    ]);
1280                    order_by.extend(["total_memory DESC", "total_records DESC"]);
1281                }
1282            }
1283            ExplainAnalyzeComputationProperty::Cpu => {
1284                if skew {
1285                    let mut set_worker_id = false;
1286                    if let Some(worker_id) = worker_id {
1287                        // join condition if we're showing skew for more than one property
1288                        predicates.push(format!("oc.worker_id = {worker_id}"));
1289                    } else {
1290                        worker_id = Some("oc.worker_id");
1291                        columns.push("oc.worker_id AS worker_id");
1292                        set_worker_id = true; // we'll add ourselves to `order_by` later
1293                    };
1294
1295                    // computes the average memory per LIR operator (for per operator ratios)
1296                    ctes.push((
1297    "per_operator_cpu_summary",
1298    r#"
1299SELECT mlm.global_id AS global_id,
1300       mlm.lir_id AS lir_id,
1301       SUM(mse.elapsed_ns) AS total_ns,
1302       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1303FROM       mz_introspection.mz_lir_mapping mlm
1304CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1305      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1306        ON (mse.id = valid_id)
1307GROUP BY mlm.global_id, mlm.lir_id"#,
1308));
1309
1310                    // computes the CPU per worker in a per operator way
1311                    ctes.push((
1312                        "per_operator_cpu_per_worker",
1313                        r#"
1314SELECT mlm.global_id AS global_id,
1315       mlm.lir_id AS lir_id,
1316       mse.worker_id AS worker_id,
1317       SUM(mse.elapsed_ns) AS worker_ns
1318FROM       mz_introspection.mz_lir_mapping mlm
1319CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1320      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1321        ON (mse.id = valid_id)
1322GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1323                    ));
1324
1325                    // computes CPU ratios per worker per operator
1326                    ctes.push((
1327                        "per_operator_cpu_ratios",
1328                        r#"
1329SELECT pocpw.global_id AS global_id,
1330       pocpw.lir_id AS lir_id,
1331       pocpw.worker_id AS worker_id,
1332       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1333FROM      per_operator_cpu_per_worker pocpw
1334     JOIN per_operator_cpu_summary pocs
1335     USING (global_id, lir_id)
1336"#,
1337                    ));
1338
1339                    // summarizes each object, per worker
1340                    ctes.push((
1341                        "object_cpu",
1342                        r#"
1343SELECT pocpw.global_id AS global_id,
1344       pocpw.worker_id AS worker_id,
1345       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1346       SUM(pocpw.worker_ns) AS worker_ns
1347FROM      per_operator_cpu_per_worker pocpw
1348     JOIN per_operator_cpu_ratios pomr
1349     USING (global_id, worker_id, lir_id)
1350GROUP BY pocpw.global_id, pocpw.worker_id
1351"#,
1352                    ));
1353
1354                    // summarizes each worker
1355                    ctes.push((
1356                        "object_average_cpu",
1357                        r#"
1358SELECT oc.global_id AS global_id,
1359       SUM(oc.worker_ns) AS total_ns,
1360       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1361  FROM object_cpu oc
1362GROUP BY oc.global_id"#,));
1363
1364                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1365                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1366
1367                    columns.extend([
1368                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1369                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1370                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1371                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1372                    ]);
1373
1374                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1375
1376                    if set_worker_id {
1377                        order_by.push("worker_id");
1378                    }
1379                } else {
1380                    // no skew, so just compute totals
1381                    ctes.push((
1382                        "per_operator_cpu_totals",
1383                        r#"
1384    SELECT mlm.global_id AS global_id,
1385           mlm.lir_id AS lir_id,
1386           SUM(mse.elapsed_ns) AS total_ns
1387    FROM        mz_introspection.mz_lir_mapping mlm
1388     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1389           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1390             ON (mse.id = valid_id)
1391    GROUP BY mlm.global_id, mlm.lir_id"#,
1392                    ));
1393
1394                    ctes.push((
1395                        "object_cpu_totals",
1396                        r#"
1397SELECT poct.global_id AS global_id,
1398       SUM(poct.total_ns) AS total_ns
1399FROM per_operator_cpu_totals poct
1400GROUP BY poct.global_id
1401"#,
1402                    ));
1403
1404                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1405                    columns
1406                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1407                    order_by.extend(["total_elapsed DESC"]);
1408                }
1409            }
1410        }
1411    }
1412
1413    // generate SQL query text
1414    let ctes = if !ctes.is_empty() {
1415        format!(
1416            "WITH {}",
1417            separated(
1418                ",\n",
1419                ctes.iter()
1420                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1421            )
1422        )
1423    } else {
1424        String::new()
1425    };
1426    let columns = separated(", ", columns);
1427    let from = separated(" ", from);
1428    let predicates = if !predicates.is_empty() {
1429        format!("WHERE {}", separated(" AND ", predicates))
1430    } else {
1431        String::new()
1432    };
1433    // add mo.name last, to break ties only
1434    order_by.push("mo.name DESC");
1435    let order_by = separated(", ", order_by);
1436    let query = format!(
1437        r#"{ctes}
1438SELECT {columns}
1439FROM {from}
1440{predicates}
1441ORDER BY {order_by}"#
1442    );
1443
1444    if statement.as_sql {
1445        let rows = vec![Row::pack_slice(&[Datum::String(
1446            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1447                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1448            })?,
1449        )])];
1450        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1451
1452        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1453    } else {
1454        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1455        show_select.plan()
1456    }
1457}
1458
1459pub fn plan_explain_timestamp(
1460    scx: &StatementContext,
1461    explain: ExplainTimestampStatement<Aug>,
1462) -> Result<Plan, PlanError> {
1463    let (format, _verbose_syntax) = match explain.format() {
1464        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1465        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1466        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1467        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1468    };
1469
1470    let raw_plan = {
1471        let query::PlannedRootQuery {
1472            expr: raw_plan,
1473            desc: _,
1474            finishing: _,
1475            scope: _,
1476        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1477        if raw_plan.contains_parameters()? {
1478            return Err(PlanError::ParameterNotAllowed(
1479                "EXPLAIN TIMESTAMP".to_string(),
1480            ));
1481        }
1482
1483        raw_plan
1484    };
1485    let when = query::plan_as_of(scx, explain.select.as_of)?;
1486
1487    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1488        format,
1489        raw_plan,
1490        when,
1491    }))
1492}
1493
1494/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1495/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1496#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1497pub fn plan_query(
1498    scx: &StatementContext,
1499    query: Query<Aug>,
1500    params: &Params,
1501    lifetime: QueryLifetime,
1502) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1503    let query::PlannedRootQuery {
1504        mut expr,
1505        desc,
1506        finishing,
1507        scope,
1508    } = query::plan_root_query(scx, query, lifetime)?;
1509    expr.bind_parameters(scx, lifetime, params)?;
1510
1511    Ok(query::PlannedRootQuery {
1512        // No metrics passed! One more reason not to use this deprecated function.
1513        expr: expr.lower(scx.catalog.system_vars(), None)?,
1514        desc,
1515        finishing,
1516        scope,
1517    })
1518}
1519
1520generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1521
1522pub fn describe_subscribe(
1523    scx: &StatementContext,
1524    stmt: SubscribeStatement<Aug>,
1525) -> Result<StatementDesc, PlanError> {
1526    let relation_desc = match stmt.relation {
1527        SubscribeRelation::Name(name) => {
1528            let item = scx.get_item_by_resolved_name(&name)?;
1529            item.desc(&scx.catalog.resolve_full_name(item.name()))?
1530                .into_owned()
1531        }
1532        SubscribeRelation::Query(query) => {
1533            let query::PlannedRootQuery { desc, .. } =
1534                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1535            desc
1536        }
1537    };
1538    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1539    let progress = progress.unwrap_or(false);
1540    let mut desc = RelationDesc::builder().with_column(
1541        "mz_timestamp",
1542        SqlScalarType::Numeric {
1543            max_scale: Some(NumericMaxScale::ZERO),
1544        }
1545        .nullable(false),
1546    );
1547    if progress {
1548        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1549    }
1550
1551    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1552    match stmt.output {
1553        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1554            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1555            for (name, mut ty) in relation_desc.into_iter() {
1556                if progress {
1557                    ty.nullable = true;
1558                }
1559                desc = desc.with_column(name, ty);
1560            }
1561        }
1562        SubscribeOutput::EnvelopeUpsert { key_columns }
1563        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1564            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1565            let key_columns = key_columns
1566                .into_iter()
1567                .map(normalize::column_name)
1568                .collect_vec();
1569            let mut before_values_desc = RelationDesc::builder();
1570            let mut after_values_desc = RelationDesc::builder();
1571
1572            // Add the key columns in the order that they're specified.
1573            for column_name in &key_columns {
1574                let mut column_ty = relation_desc
1575                    .get_by_name(column_name)
1576                    .map(|(_pos, ty)| ty.clone())
1577                    .ok_or_else(|| PlanError::UnknownColumn {
1578                        table: None,
1579                        column: column_name.clone(),
1580                        similar: Box::new([]),
1581                    })?;
1582                if progress {
1583                    column_ty.nullable = true;
1584                }
1585                desc = desc.with_column(column_name, column_ty);
1586            }
1587
1588            // Then add the remaining columns in the order from the original
1589            // table, filtering out the key columns since we added those above.
1590            for (mut name, mut ty) in relation_desc
1591                .into_iter()
1592                .filter(|(name, _ty)| !key_columns.contains(name))
1593            {
1594                ty.nullable = true;
1595                before_values_desc =
1596                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1597                if debezium {
1598                    name = format!("after_{}", name).into();
1599                }
1600                after_values_desc = after_values_desc.with_column(name, ty);
1601            }
1602
1603            if debezium {
1604                desc = desc.concat(before_values_desc);
1605            }
1606            desc = desc.concat(after_values_desc);
1607        }
1608    }
1609    Ok(StatementDesc::new(Some(desc.finish())))
1610}
1611
1612pub fn plan_subscribe(
1613    scx: &StatementContext,
1614    SubscribeStatement {
1615        relation,
1616        options,
1617        as_of,
1618        up_to,
1619        output,
1620    }: SubscribeStatement<Aug>,
1621    params: &Params,
1622    copy_to: Option<CopyFormat>,
1623) -> Result<Plan, PlanError> {
1624    let (from, desc, scope) = match relation {
1625        SubscribeRelation::Name(name) => {
1626            let entry = scx.get_item_by_resolved_name(&name)?;
1627            let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1628                Ok(desc) => desc,
1629                Err(..) => sql_bail!(
1630                    "'{}' cannot be subscribed to because it is a {}",
1631                    name.full_name_str(),
1632                    entry.item_type(),
1633                ),
1634            };
1635            let item_name = match name {
1636                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1637                _ => None,
1638            };
1639            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1640            (
1641                SubscribeFrom::Id(entry.global_id()),
1642                desc.into_owned(),
1643                scope,
1644            )
1645        }
1646        SubscribeRelation::Query(query) => {
1647            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1648            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1649            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1650            // finishing should have already been turned into a `TopK` by
1651            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1652            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1653                &query.finishing,
1654                query.desc.arity()
1655            ));
1656            let desc = query.desc.clone();
1657            (
1658                SubscribeFrom::Query {
1659                    expr: query.expr,
1660                    desc: query.desc,
1661                },
1662                desc,
1663                query.scope,
1664            )
1665        }
1666    };
1667
1668    let when = query::plan_as_of(scx, as_of)?;
1669    let up_to = up_to
1670        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1671        .transpose()?;
1672
1673    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1674    let ecx = ExprContext {
1675        qcx: &qcx,
1676        name: "",
1677        scope: &scope,
1678        relation_type: desc.typ(),
1679        allow_aggregates: false,
1680        allow_subqueries: true,
1681        allow_parameters: true,
1682        allow_windows: false,
1683    };
1684
1685    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1686    let output = match output {
1687        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1688        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1689            let order_by = key_columns
1690                .iter()
1691                .map(|ident| OrderByExpr {
1692                    expr: Expr::Identifier(vec![ident.clone()]),
1693                    asc: None,
1694                    nulls_last: None,
1695                })
1696                .collect_vec();
1697            let (order_by, map_exprs) = query::plan_order_by_exprs(
1698                &ExprContext {
1699                    name: "ENVELOPE UPSERT KEY clause",
1700                    ..ecx
1701                },
1702                &order_by[..],
1703                &output_columns[..],
1704            )?;
1705            if !map_exprs.is_empty() {
1706                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1707            }
1708            plan::SubscribeOutput::EnvelopeUpsert {
1709                order_by_keys: order_by,
1710            }
1711        }
1712        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1713            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1714            let order_by = key_columns
1715                .iter()
1716                .map(|ident| OrderByExpr {
1717                    expr: Expr::Identifier(vec![ident.clone()]),
1718                    asc: None,
1719                    nulls_last: None,
1720                })
1721                .collect_vec();
1722            let (order_by, map_exprs) = query::plan_order_by_exprs(
1723                &ExprContext {
1724                    name: "ENVELOPE DEBEZIUM KEY clause",
1725                    ..ecx
1726                },
1727                &order_by[..],
1728                &output_columns[..],
1729            )?;
1730            if !map_exprs.is_empty() {
1731                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1732            }
1733            plan::SubscribeOutput::EnvelopeDebezium {
1734                order_by_keys: order_by,
1735            }
1736        }
1737        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1738            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1739            let mz_diff = "mz_diff".into();
1740            let output_columns = std::iter::once((0, &mz_diff))
1741                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1742                .collect_vec();
1743            match query::plan_order_by_exprs(
1744                &ExprContext {
1745                    name: "WITHIN TIMESTAMP ORDER BY clause",
1746                    ..ecx
1747                },
1748                &order_by[..],
1749                &output_columns[..],
1750            ) {
1751                Err(PlanError::UnknownColumn {
1752                    table: None,
1753                    column,
1754                    similar: _,
1755                }) if &column == &mz_diff => {
1756                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1757                    // it looks like an unknown column. Instead, return a better error
1758                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1759                }
1760                Err(e) => return Err(e),
1761                Ok((order_by, map_exprs)) => {
1762                    if !map_exprs.is_empty() {
1763                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1764                    }
1765
1766                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1767                }
1768            }
1769        }
1770    };
1771
1772    let SubscribeOptionExtracted {
1773        progress, snapshot, ..
1774    } = options.try_into()?;
1775    Ok(Plan::Subscribe(SubscribePlan {
1776        from,
1777        when,
1778        up_to,
1779        with_snapshot: snapshot.unwrap_or(true),
1780        copy_to,
1781        emit_progress: progress.unwrap_or(false),
1782        output,
1783    }))
1784}
1785
1786pub fn describe_copy_from_table(
1787    scx: &StatementContext,
1788    table_name: <Aug as AstInfo>::ItemName,
1789    columns: Vec<Ident>,
1790) -> Result<StatementDesc, PlanError> {
1791    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1792    Ok(StatementDesc::new(Some(desc)))
1793}
1794
1795pub fn describe_copy_item(
1796    scx: &StatementContext,
1797    object_name: <Aug as AstInfo>::ItemName,
1798    columns: Vec<Ident>,
1799) -> Result<StatementDesc, PlanError> {
1800    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1801    Ok(StatementDesc::new(Some(desc)))
1802}
1803
1804pub fn describe_copy(
1805    scx: &StatementContext,
1806    CopyStatement {
1807        relation,
1808        direction,
1809        ..
1810    }: CopyStatement<Aug>,
1811) -> Result<StatementDesc, PlanError> {
1812    Ok(match (relation, direction) {
1813        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1814            describe_copy_item(scx, name, columns)?
1815        }
1816        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1817            describe_copy_from_table(scx, name, columns)?
1818        }
1819        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1820        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1821    }
1822    .with_is_copy())
1823}
1824
1825fn plan_copy_to_expr(
1826    scx: &StatementContext,
1827    select_plan: SelectPlan,
1828    desc: RelationDesc,
1829    to: &Expr<Aug>,
1830    format: CopyFormat,
1831    options: CopyOptionExtracted,
1832) -> Result<Plan, PlanError> {
1833    let conn_id = match options.aws_connection {
1834        Some(conn_id) => CatalogItemId::from(conn_id),
1835        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1836    };
1837    let connection = scx.get_item(&conn_id).connection()?;
1838
1839    match connection {
1840        mz_storage_types::connections::Connection::Aws(_) => {}
1841        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1842    }
1843
1844    let format = match format {
1845        CopyFormat::Csv => {
1846            let quote = extract_byte_param_value(options.quote, "quote")?;
1847            let escape = extract_byte_param_value(options.escape, "escape")?;
1848            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1849            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1850                CopyCsvFormatParams::try_new(
1851                    delimiter,
1852                    quote,
1853                    escape,
1854                    options.header,
1855                    options.null,
1856                )
1857                .map_err(|e| sql_err!("{}", e))?,
1858            ))
1859        }
1860        CopyFormat::Parquet => {
1861            // Validate that the output desc can be formatted as parquet
1862            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1863            S3SinkFormat::Parquet
1864        }
1865        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1866        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1867    };
1868
1869    // Converting the to expr to a HirScalarExpr
1870    let mut to_expr = to.clone();
1871    transform_ast::transform(scx, &mut to_expr)?;
1872    let relation_type = RelationDesc::empty();
1873    let ecx = &ExprContext {
1874        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1875        name: "COPY TO target",
1876        scope: &Scope::empty(),
1877        relation_type: relation_type.typ(),
1878        allow_aggregates: false,
1879        allow_subqueries: false,
1880        allow_parameters: false,
1881        allow_windows: false,
1882    };
1883
1884    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1885
1886    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1887        sql_bail!(
1888            "MAX FILE SIZE cannot be less than {}",
1889            MIN_S3_SINK_FILE_SIZE
1890        );
1891    }
1892    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1893        sql_bail!(
1894            "MAX FILE SIZE cannot be greater than {}",
1895            MAX_S3_SINK_FILE_SIZE
1896        );
1897    }
1898
1899    Ok(Plan::CopyTo(CopyToPlan {
1900        select_plan,
1901        desc,
1902        to,
1903        connection: connection.to_owned(),
1904        connection_id: conn_id,
1905        format,
1906        max_file_size: options.max_file_size.as_bytes(),
1907    }))
1908}
1909
1910fn plan_copy_from(
1911    scx: &StatementContext,
1912    target: &CopyTarget<Aug>,
1913    table_name: ResolvedItemName,
1914    columns: Vec<Ident>,
1915    format: CopyFormat,
1916    options: CopyOptionExtracted,
1917) -> Result<Plan, PlanError> {
1918    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1919        match option {
1920            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1921            None => Ok(()),
1922        }
1923    }
1924
1925    let source = match target {
1926        CopyTarget::Stdin => CopyFromSource::Stdin,
1927        CopyTarget::Expr(from) => {
1928            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1929
1930            // Converting the expr to an HirScalarExpr
1931            let mut from_expr = from.clone();
1932            transform_ast::transform(scx, &mut from_expr)?;
1933            let relation_type = RelationDesc::empty();
1934            let ecx = &ExprContext {
1935                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1936                name: "COPY FROM target",
1937                scope: &Scope::empty(),
1938                relation_type: relation_type.typ(),
1939                allow_aggregates: false,
1940                allow_subqueries: false,
1941                allow_parameters: false,
1942                allow_windows: false,
1943            };
1944            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1945
1946            match options.aws_connection {
1947                Some(conn_id) => {
1948                    let conn_id = CatalogItemId::from(conn_id);
1949
1950                    // Validate the connection type is one we expect.
1951                    let connection = match scx.get_item(&conn_id).connection()? {
1952                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1953                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1954                    };
1955
1956                    CopyFromSource::AwsS3 {
1957                        uri: from,
1958                        connection,
1959                        connection_id: conn_id,
1960                    }
1961                }
1962                None => CopyFromSource::Url(from),
1963            }
1964        }
1965        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1966    };
1967
1968    let params = match format {
1969        CopyFormat::Text => {
1970            only_available_with_csv(options.quote, "quote")?;
1971            only_available_with_csv(options.escape, "escape")?;
1972            only_available_with_csv(options.header, "HEADER")?;
1973            let delimiter =
1974                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1975            let null = match options.null {
1976                Some(null) => Cow::from(null),
1977                None => Cow::from("\\N"),
1978            };
1979            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1980        }
1981        CopyFormat::Csv => {
1982            let quote = extract_byte_param_value(options.quote, "quote")?;
1983            let escape = extract_byte_param_value(options.escape, "escape")?;
1984            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1985            CopyFormatParams::Csv(
1986                CopyCsvFormatParams::try_new(
1987                    delimiter,
1988                    quote,
1989                    escape,
1990                    options.header,
1991                    options.null,
1992                )
1993                .map_err(|e| sql_err!("{}", e))?,
1994            )
1995        }
1996        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1997        CopyFormat::Parquet => CopyFormatParams::Parquet,
1998    };
1999
2000    let filter = match (options.files, options.pattern) {
2001        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2002        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2003        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2004        (None, None) => None,
2005    };
2006
2007    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2008        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2009    }
2010
2011    let table_name_string = table_name.full_name_str();
2012
2013    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2014
2015    let Some(mfp) = maybe_mfp else {
2016        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2017    };
2018
2019    Ok(Plan::CopyFrom(CopyFromPlan {
2020        target_id: id,
2021        target_name: table_name_string,
2022        source,
2023        columns,
2024        source_desc,
2025        mfp,
2026        params,
2027        filter,
2028    }))
2029}
2030
2031fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2032    match v {
2033        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2034        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2035        None => Ok(None),
2036    }
2037}
2038
2039generate_extracted_config!(
2040    CopyOption,
2041    (Format, String),
2042    (Delimiter, String),
2043    (Null, String),
2044    (Escape, String),
2045    (Quote, String),
2046    (Header, bool),
2047    (AwsConnection, with_options::Object),
2048    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2049    (Files, Vec<String>),
2050    (Pattern, String)
2051);
2052
2053pub fn plan_copy(
2054    scx: &StatementContext,
2055    CopyStatement {
2056        relation,
2057        direction,
2058        target,
2059        options,
2060    }: CopyStatement<Aug>,
2061) -> Result<Plan, PlanError> {
2062    let options = CopyOptionExtracted::try_from(options)?;
2063    // Parse any user-provided FORMAT option. If not provided, will default to
2064    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2065    let format = options
2066        .format
2067        .as_ref()
2068        .map(|format| match format.to_lowercase().as_str() {
2069            "text" => Ok(CopyFormat::Text),
2070            "csv" => Ok(CopyFormat::Csv),
2071            "binary" => Ok(CopyFormat::Binary),
2072            "parquet" => Ok(CopyFormat::Parquet),
2073            _ => sql_bail!("unknown FORMAT: {}", format),
2074        })
2075        .transpose()?;
2076
2077    match (&direction, &target) {
2078        (CopyDirection::To, CopyTarget::Stdout) => {
2079            if options.delimiter.is_some() {
2080                sql_bail!("COPY TO does not support DELIMITER option yet");
2081            }
2082            if options.quote.is_some() {
2083                sql_bail!("COPY TO does not support QUOTE option yet");
2084            }
2085            if options.null.is_some() {
2086                sql_bail!("COPY TO does not support NULL option yet");
2087            }
2088            match relation {
2089                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2090                CopyRelation::Select(stmt) => Ok(plan_select(
2091                    scx,
2092                    stmt,
2093                    &Params::empty(),
2094                    Some(format.unwrap_or(CopyFormat::Text)),
2095                )?),
2096                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2097                    scx,
2098                    stmt,
2099                    &Params::empty(),
2100                    Some(format.unwrap_or(CopyFormat::Text)),
2101                )?),
2102            }
2103        }
2104        (CopyDirection::From, target) => match relation {
2105            CopyRelation::Named { name, columns } => plan_copy_from(
2106                scx,
2107                target,
2108                name,
2109                columns,
2110                format.unwrap_or(CopyFormat::Text),
2111                options,
2112            ),
2113            _ => sql_bail!("COPY FROM {} not supported", target),
2114        },
2115        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2116            // System users are always allowed to use this feature, even when
2117            // the flag is disabled, so that we can dogfood for analytics in
2118            // production environments. The feature is stable enough that we're
2119            // not worried about it crashing.
2120            if !scx.catalog.active_role_id().is_system() {
2121                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2122            }
2123
2124            let format = match format {
2125                Some(inner) => inner,
2126                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2127            };
2128
2129            let stmt = match relation {
2130                CopyRelation::Named { name, columns } => {
2131                    if !columns.is_empty() {
2132                        // TODO(mouli): Add support for this
2133                        sql_bail!(
2134                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2135                        );
2136                    }
2137                    // Generate a synthetic SELECT query that just gets the table
2138                    let query = Query {
2139                        ctes: CteBlock::empty(),
2140                        body: SetExpr::Table(name),
2141                        order_by: vec![],
2142                        limit: None,
2143                        offset: None,
2144                    };
2145                    SelectStatement { query, as_of: None }
2146                }
2147                CopyRelation::Select(stmt) => {
2148                    if !stmt.query.order_by.is_empty() {
2149                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2150                    }
2151                    stmt
2152                }
2153                _ => sql_bail!("COPY {} {} not supported", direction, target),
2154            };
2155
2156            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2157            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2158        }
2159        _ => sql_bail!("COPY {} {} not supported", direction, target),
2160    }
2161}