Skip to main content

mz_sql/plan/statement/
dml.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data manipulation language (DML).
11//!
12//! This module houses the handlers for statements that manipulate data, like
13//! `INSERT`, `SELECT`, `SUBSCRIBE`, and `COPY`.
14
15use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33    CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34    ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35    ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36    ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37    SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41    KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42    MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47    AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48    DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49    SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50    UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56    ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62    self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63    ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66    CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67    QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72// TODO(benesch): currently, describing a `SELECT` or `INSERT` query
73// plans the whole query to determine its shape and parameter types,
74// and then throws away that plan. If we were smarter, we'd stash that
75// plan somewhere so we don't have to recompute it when the query is
76// executed.
77
78pub fn describe_insert(
79    scx: &StatementContext,
80    InsertStatement {
81        table_name,
82        columns,
83        source,
84        returning,
85    }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87    let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88    let desc = if returning.expr.is_empty() {
89        None
90    } else {
91        Some(returning.desc)
92    };
93    Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97    scx: &StatementContext,
98    InsertStatement {
99        table_name,
100        columns,
101        source,
102        returning,
103    }: InsertStatement<Aug>,
104    params: &Params,
105) -> Result<Plan, PlanError> {
106    let (id, mut expr, returning) =
107        query::plan_insert_query(scx, table_name, columns, source, returning)?;
108    expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109    let returning = returning
110        .expr
111        .into_iter()
112        .map(|mut expr| {
113            expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114            expr.lower_uncorrelated(scx.catalog.system_vars())
115        })
116        .collect::<Result<Vec<_>, _>>()?;
117
118    Ok(Plan::Insert(InsertPlan {
119        id,
120        values: expr,
121        returning,
122    }))
123}
124
125pub fn describe_delete(
126    scx: &StatementContext,
127    stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129    query::plan_delete_query(scx, stmt)?;
130    Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134    scx: &StatementContext,
135    stmt: DeleteStatement<Aug>,
136    params: &Params,
137) -> Result<Plan, PlanError> {
138    let rtw_plan = query::plan_delete_query(scx, stmt)?;
139    plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143    scx: &StatementContext,
144    stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146    query::plan_update_query(scx, stmt)?;
147    Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151    scx: &StatementContext,
152    stmt: UpdateStatement<Aug>,
153    params: &Params,
154) -> Result<Plan, PlanError> {
155    let rtw_plan = query::plan_update_query(scx, stmt)?;
156    plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160    scx: &StatementContext,
161    kind: MutationKind,
162    params: &Params,
163    query::ReadThenWritePlan {
164        id,
165        mut selection,
166        finishing,
167        assignments,
168    }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170    selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171    let mut assignments_outer = BTreeMap::new();
172    for (idx, mut set) in assignments {
173        set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174        let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
175        assignments_outer.insert(idx, set);
176    }
177
178    Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179        id,
180        selection,
181        finishing,
182        assignments: assignments_outer,
183        kind,
184        returning: Vec::new(),
185    }))
186}
187
188pub fn describe_select(
189    scx: &StatementContext,
190    stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192    if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193        return Ok(StatementDesc::new(Some(desc)));
194    }
195
196    let query::PlannedRootQuery { desc, .. } =
197        query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198    Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202    scx: &StatementContext,
203    select: SelectStatement<Aug>,
204    params: &Params,
205    copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207    if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208        return Ok(Plan::SideEffectingFunc(f));
209    }
210
211    let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212    Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216    scx: &StatementContext,
217    select: SelectStatement<Aug>,
218    params: &Params,
219    copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221    let when = query::plan_as_of(scx, select.as_of.clone())?;
222    let lifetime = QueryLifetime::OneShot;
223    let query::PlannedRootQuery {
224        mut expr,
225        desc,
226        finishing,
227        scope: _,
228    } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229    expr.bind_parameters(scx, lifetime, params)?;
230
231    // OFFSET clauses in `expr` should become constants with the above binding of parameters.
232    // Let's check this and simplify them to literals.
233    expr.try_visit_mut_pre(&mut |expr| {
234        if let HirRelationExpr::TopK { offset, .. } = expr {
235            let offset_value = offset_into_value(offset.take())?;
236            *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237        }
238        Ok::<(), PlanError>(())
239    })?;
240    // (We don't need to simplify LIMIT clauses in `expr`, because we can handle non-constant
241    // expressions there. If they happen to be simplifiable to literals, then the optimizer will do
242    // so later.)
243
244    // We need to concretize the `limit` and `offset` of the RowSetFinishing, so that we go from
245    // `RowSetFinishing<HirScalarExpr, HirScalarExpr>` to `RowSetFinishing`.
246    // This involves binding parameters and evaluating each expression to a number.
247    // (This should be possible even for `limit` here, because we are at the top level of a SELECT,
248    // so this `limit` has to be a constant.)
249    let limit = match finishing.limit {
250        None => None,
251        Some(mut limit) => {
252            limit.bind_parameters(scx, lifetime, params)?;
253            // TODO: Call `try_into_literal_int64` instead of `as_literal`.
254            let Some(limit) = limit.as_literal() else {
255                sql_bail!(
256                    "Top-level LIMIT must be a constant expression, got {}",
257                    limit
258                )
259            };
260            match limit {
261                Datum::Null => None,
262                Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263                _ => {
264                    soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265                    sql_bail!("LIMIT must be a non-negative INT or NULL")
266                }
267            }
268        }
269    };
270    let offset = {
271        let mut offset = finishing.offset.clone();
272        offset.bind_parameters(scx, lifetime, params)?;
273        let offset = offset_into_value(offset.take())?;
274        offset
275            .try_into()
276            .expect("checked in offset_into_value that it is not negative")
277    };
278
279    let plan = SelectPlan {
280        source: expr,
281        when,
282        finishing: RowSetFinishing {
283            limit,
284            offset,
285            project: finishing.project,
286            order_by: finishing.order_by,
287        },
288        copy_to,
289        select: Some(Box::new(select)),
290    };
291
292    Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296    scx: &StatementContext,
297    explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299    let mut relation_desc = RelationDesc::builder();
300
301    match explain.stage() {
302        ExplainStage::RawPlan => {
303            let name = "Raw Plan";
304            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305        }
306        ExplainStage::DecorrelatedPlan => {
307            let name = "Decorrelated Plan";
308            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309        }
310        ExplainStage::LocalPlan => {
311            let name = "Locally Optimized Plan";
312            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313        }
314        ExplainStage::GlobalPlan => {
315            let name = "Optimized Plan";
316            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317        }
318        ExplainStage::PhysicalPlan => {
319            let name = "Physical Plan";
320            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321        }
322        ExplainStage::Trace => {
323            relation_desc = relation_desc
324                .with_column("Time", SqlScalarType::UInt64.nullable(false))
325                .with_column("Path", SqlScalarType::String.nullable(false))
326                .with_column("Plan", SqlScalarType::String.nullable(false));
327        }
328        ExplainStage::PlanInsights => {
329            let name = "Plan Insights";
330            relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331        }
332    };
333    let relation_desc = relation_desc.finish();
334
335    Ok(
336        StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338            _ => vec![],
339        }),
340    )
341}
342
343pub fn describe_explain_pushdown(
344    scx: &StatementContext,
345    statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347    let relation_desc = RelationDesc::builder()
348        .with_column("Source", SqlScalarType::String.nullable(false))
349        .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350        .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351        .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352        .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353        .finish();
354
355    Ok(
356        StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357            Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358            _ => vec![],
359        }),
360    )
361}
362
363pub fn describe_explain_analyze_object(
364    _scx: &StatementContext,
365    statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367    if statement.as_sql {
368        let relation_desc = RelationDesc::builder()
369            .with_column("SQL", SqlScalarType::String.nullable(false))
370            .finish();
371        return Ok(StatementDesc::new(Some(relation_desc)));
372    }
373
374    match statement.properties {
375        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376            properties,
377            skew,
378        }) => {
379            let mut relation_desc = RelationDesc::builder()
380                .with_column("operator", SqlScalarType::String.nullable(false));
381
382            if skew {
383                relation_desc =
384                    relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385            }
386
387            let mut seen_properties = BTreeSet::new();
388            for property in properties {
389                // handle each property only once (belt and suspenders)
390                if !seen_properties.insert(property) {
391                    continue;
392                }
393
394                match property {
395                    ExplainAnalyzeComputationProperty::Memory if skew => {
396                        let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397                        relation_desc = relation_desc
398                            .with_column("memory_ratio", numeric.clone())
399                            .with_column("worker_memory", SqlScalarType::String.nullable(true))
400                            .with_column("avg_memory", SqlScalarType::String.nullable(true))
401                            .with_column("total_memory", SqlScalarType::String.nullable(true))
402                            .with_column("records_ratio", numeric.clone())
403                            .with_column("worker_records", numeric.clone())
404                            .with_column("avg_records", numeric.clone())
405                            .with_column("total_records", numeric);
406                    }
407                    ExplainAnalyzeComputationProperty::Memory => {
408                        relation_desc = relation_desc
409                            .with_column("total_memory", SqlScalarType::String.nullable(true))
410                            .with_column(
411                                "total_records",
412                                SqlScalarType::Numeric { max_scale: None }.nullable(true),
413                            );
414                    }
415                    ExplainAnalyzeComputationProperty::Cpu => {
416                        if skew {
417                            relation_desc = relation_desc
418                                .with_column(
419                                    "cpu_ratio",
420                                    SqlScalarType::Numeric { max_scale: None }.nullable(true),
421                                )
422                                .with_column(
423                                    "worker_elapsed",
424                                    SqlScalarType::Interval.nullable(true),
425                                )
426                                .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427                        }
428                        relation_desc = relation_desc
429                            .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430                    }
431                }
432            }
433
434            let relation_desc = relation_desc.finish();
435            Ok(StatementDesc::new(Some(relation_desc)))
436        }
437        ExplainAnalyzeProperty::Hints => {
438            let relation_desc = RelationDesc::builder()
439                .with_column("operator", SqlScalarType::String.nullable(true))
440                .with_column("levels", SqlScalarType::Int64.nullable(true))
441                .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442                .with_column("hint", SqlScalarType::Float64.nullable(true))
443                .with_column("savings", SqlScalarType::String.nullable(true))
444                .finish();
445            Ok(StatementDesc::new(Some(relation_desc)))
446        }
447    }
448}
449
450pub fn describe_explain_analyze_cluster(
451    _scx: &StatementContext,
452    statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454    if statement.as_sql {
455        let relation_desc = RelationDesc::builder()
456            .with_column("SQL", SqlScalarType::String.nullable(false))
457            .finish();
458        return Ok(StatementDesc::new(Some(relation_desc)));
459    }
460
461    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463    let mut relation_desc = RelationDesc::builder()
464        .with_column("object", SqlScalarType::String.nullable(false))
465        .with_column("global_id", SqlScalarType::String.nullable(false));
466
467    if skew {
468        relation_desc =
469            relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470    }
471
472    let mut seen_properties = BTreeSet::new();
473    for property in properties {
474        // handle each property only once (belt and suspenders)
475        if !seen_properties.insert(property) {
476            continue;
477        }
478
479        match property {
480            ExplainAnalyzeComputationProperty::Memory if skew => {
481                let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482                relation_desc = relation_desc
483                    .with_column("max_operator_memory_ratio", numeric.clone())
484                    .with_column("worker_memory", SqlScalarType::String.nullable(true))
485                    .with_column("avg_memory", SqlScalarType::String.nullable(true))
486                    .with_column("total_memory", SqlScalarType::String.nullable(true))
487                    .with_column("max_operator_records_ratio", numeric.clone())
488                    .with_column("worker_records", numeric.clone())
489                    .with_column("avg_records", numeric.clone())
490                    .with_column("total_records", numeric);
491            }
492            ExplainAnalyzeComputationProperty::Memory => {
493                relation_desc = relation_desc
494                    .with_column("total_memory", SqlScalarType::String.nullable(true))
495                    .with_column(
496                        "total_records",
497                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
498                    );
499            }
500            ExplainAnalyzeComputationProperty::Cpu if skew => {
501                relation_desc = relation_desc
502                    .with_column(
503                        "max_operator_cpu_ratio",
504                        SqlScalarType::Numeric { max_scale: None }.nullable(true),
505                    )
506                    .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507                    .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509            }
510            ExplainAnalyzeComputationProperty::Cpu => {
511                relation_desc = relation_desc
512                    .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513            }
514        }
515    }
516
517    Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521    scx: &StatementContext,
522    ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524    let relation_desc = RelationDesc::builder()
525        .with_column("Timestamp", SqlScalarType::String.nullable(false))
526        .finish();
527
528    Ok(StatementDesc::new(Some(relation_desc))
529        .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533    _: &StatementContext,
534    ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536    let relation_desc = RelationDesc::builder()
537        .with_column("Schema", SqlScalarType::String.nullable(false))
538        .finish();
539    Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542// Currently, there are two reasons for why a flag should be `Option<bool>` instead of simply
543// `bool`:
544// - When it's an override of a global feature flag, for example optimizer feature flags. In this
545//   case, we need not just false and true, but also None to say "take the value of the global
546//   flag".
547// - When it's an override of whether SOFT_ASSERTIONS are enabled. For example, when `Arity` is not
548//   explicitly given in the EXPLAIN command, then we'd like staging and prod to default to true,
549//   but otherwise we'd like to default to false.
550generate_extracted_config!(
551    ExplainPlanOption,
552    (Arity, Option<bool>, Default(None)),
553    (Cardinality, bool, Default(false)),
554    (ColumnNames, bool, Default(false)),
555    (FilterPushdown, Option<bool>, Default(None)),
556    (HumanizedExpressions, Option<bool>, Default(None)),
557    (JoinImplementations, bool, Default(false)),
558    (Keys, bool, Default(false)),
559    (LinearChains, bool, Default(false)),
560    (NoFastPath, bool, Default(false)),
561    (NonNegative, bool, Default(false)),
562    (NoNotices, bool, Default(false)),
563    (NodeIdentifiers, bool, Default(false)),
564    (Raw, bool, Default(false)),
565    (RawPlans, bool, Default(false)),
566    (RawSyntax, bool, Default(false)),
567    (Redacted, bool, Default(false)),
568    (SubtreeSize, bool, Default(false)),
569    (Timing, bool, Default(false)),
570    (Types, bool, Default(false)),
571    (Equivalences, bool, Default(false)),
572    (ReoptimizeImportedViews, Option<bool>, Default(None)),
573    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578    (
579        EnableProjectionPushdownAfterRelationCse,
580        Option<bool>,
581        Default(None)
582    )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586    type Error = PlanError;
587
588    fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589        // If `WITH(raw)` is specified, ensure that the config will be as
590        // representative for the original plan as possible.
591        if v.raw {
592            v.raw_plans = true;
593            v.raw_syntax = true;
594        }
595
596        // Certain config should default to be enabled in release builds running on
597        // staging or prod (where SOFT_ASSERTIONS are turned off).
598        let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600        Ok(ExplainConfig {
601            arity: v.arity.unwrap_or(enable_on_prod),
602            cardinality: v.cardinality,
603            column_names: v.column_names,
604            filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605            humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606            join_impls: v.join_implementations,
607            keys: v.keys,
608            linear_chains: !v.raw_plans && v.linear_chains,
609            no_fast_path: v.no_fast_path,
610            no_notices: v.no_notices,
611            node_ids: v.node_identifiers,
612            non_negative: v.non_negative,
613            raw_plans: v.raw_plans,
614            raw_syntax: v.raw_syntax,
615            verbose_syntax: false,
616            redacted: v.redacted,
617            subtree_size: v.subtree_size,
618            equivalences: v.equivalences,
619            timing: v.timing,
620            types: v.types,
621            // The ones that are initialized with `Default::default()` are not wired up to EXPLAIN.
622            features: OptimizerFeatureOverrides {
623                enable_guard_subquery_tablefunc: Default::default(),
624                enable_eager_delta_joins: v.enable_eager_delta_joins,
625                enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626                enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627                enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628                enable_consolidate_after_union_negate: Default::default(),
629                enable_reduce_mfp_fusion: Default::default(),
630                enable_cardinality_estimates: Default::default(),
631                persist_fast_path_limit: Default::default(),
632                reoptimize_imported_views: v.reoptimize_imported_views,
633                enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
634                enable_projection_pushdown_after_relation_cse: v
635                    .enable_projection_pushdown_after_relation_cse,
636                enable_less_reduce_in_eqprop: Default::default(),
637                enable_dequadratic_eqprop_map: Default::default(),
638                enable_eq_classes_withholding_errors: Default::default(),
639                enable_fast_path_plan_insights: Default::default(),
640                enable_cast_elimination: Default::default(),
641            },
642        })
643    }
644}
645
646fn plan_explainee(
647    scx: &StatementContext,
648    explainee: Explainee<Aug>,
649    params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651    use crate::plan::ExplaineeStatement;
652
653    let is_replan = matches!(
654        explainee,
655        Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656    );
657
658    let explainee = match explainee {
659        Explainee::View(name) | Explainee::ReplanView(name) => {
660            let item = scx.get_item_by_resolved_name(&name)?;
661            let item_type = item.item_type();
662            if item_type != CatalogItemType::View {
663                sql_bail!("Expected {name} to be a view, not a {item_type}");
664            }
665            match is_replan {
666                true => crate::plan::Explainee::ReplanView(item.id()),
667                false => crate::plan::Explainee::View(item.id()),
668            }
669        }
670        Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671            let item = scx.get_item_by_resolved_name(&name)?;
672            let item_type = item.item_type();
673            if item_type != CatalogItemType::MaterializedView {
674                sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675            }
676            match is_replan {
677                true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678                false => crate::plan::Explainee::MaterializedView(item.id()),
679            }
680        }
681        Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682            let item = scx.get_item_by_resolved_name(&name)?;
683            let item_type = item.item_type();
684            if item_type != CatalogItemType::Index {
685                sql_bail!("Expected {name} to be an index, not a {item_type}");
686            }
687            match is_replan {
688                true => crate::plan::Explainee::ReplanIndex(item.id()),
689                false => crate::plan::Explainee::Index(item.id()),
690            }
691        }
692        Explainee::Select(select, broken) => {
693            let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694            crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695        }
696        Explainee::CreateView(mut stmt, broken) => {
697            if stmt.if_exists != IfExistsBehavior::Skip {
698                // If we don't force this parameter to Skip planning will
699                // fail for names that already exist in the catalog. This
700                // can happen even in `Replace` mode if the existing item
701                // has dependencies.
702                stmt.if_exists = IfExistsBehavior::Skip;
703            } else {
704                sql_bail!(
705                    "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
707                );
708            }
709
710            let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711                sql_bail!("expected CreateViewPlan plan");
712            };
713
714            crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715        }
716        Explainee::CreateMaterializedView(mut stmt, broken) => {
717            if stmt.if_exists != IfExistsBehavior::Skip {
718                // If we don't force this parameter to Skip planning will
719                // fail for names that already exist in the catalog. This
720                // can happen even in `Replace` mode if the existing item
721                // has dependencies.
722                stmt.if_exists = IfExistsBehavior::Skip;
723            } else {
724                sql_bail!(
725                    "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
727                );
728            }
729
730            let Plan::CreateMaterializedView(plan) =
731                ddl::plan_create_materialized_view(scx, *stmt)?
732            else {
733                sql_bail!("expected CreateMaterializedViewPlan plan");
734            };
735
736            crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737                broken,
738                plan,
739            })
740        }
741        Explainee::CreateIndex(mut stmt, broken) => {
742            if !stmt.if_not_exists {
743                // If we don't force this parameter to true planning will
744                // fail for index items that already exist in the catalog.
745                stmt.if_not_exists = true;
746            } else {
747                sql_bail!(
748                    "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749                     (the behavior is implied within the scope of an enclosing EXPLAIN)"
750                );
751            }
752
753            let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754                sql_bail!("expected CreateIndexPlan plan");
755            };
756
757            crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758        }
759        Explainee::Subscribe(stmt, broken) => {
760            let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761                sql_bail!("expected SubscribePlan");
762            };
763            crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764        }
765    };
766
767    Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771    scx: &StatementContext,
772    explain: ExplainPlanStatement<Aug>,
773    params: &Params,
774) -> Result<Plan, PlanError> {
775    let (format, verbose_syntax) = match explain.format() {
776        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780    };
781    let stage = explain.stage();
782
783    // Plan ExplainConfig.
784    let mut config = {
785        let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787        if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788            // If filtering is disabled, explain plans should not include pushdown info.
789            with_options.filter_pushdown = Some(false);
790        }
791
792        ExplainConfig::try_from(with_options)?
793    };
794    config.verbose_syntax = verbose_syntax;
795
796    let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798    Ok(Plan::ExplainPlan(ExplainPlanPlan {
799        stage,
800        format,
801        config,
802        explainee,
803    }))
804}
805
806pub fn plan_explain_schema(
807    scx: &StatementContext,
808    explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810    let ExplainSinkSchemaStatement {
811        schema_for,
812        // Parser limits to JSON.
813        format: _,
814        mut statement,
815    } = explain_schema;
816
817    // Force the sink's name to one that's guaranteed not to exist, by virtue of
818    // being a non-existent item in a schema under the system's control, so that
819    // `plan_create_sink` doesn't complain about the name already existing.
820    statement.name = Some(UnresolvedItemName::qualified(&[
821        ident!("mz_catalog"),
822        ident!("mz_explain_schema"),
823    ]));
824
825    crate::pure::purify_create_sink_avro_doc_on_options(
826        scx.catalog,
827        *statement.from.item_id(),
828        &mut statement.format,
829    )?;
830
831    match ddl::plan_create_sink(scx, statement)? {
832        Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833            StorageSinkConnection::Kafka(KafkaSinkConnection {
834                format:
835                    KafkaSinkFormat {
836                        key_format,
837                        value_format:
838                            KafkaSinkFormatType::Avro {
839                                schema: value_schema,
840                                ..
841                            },
842                        ..
843                    },
844                ..
845            }) => {
846                let schema = match schema_for {
847                    ExplainSinkSchemaFor::Key => key_format
848                        .and_then(|f| match f {
849                            KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850                            _ => None,
851                        })
852                        .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853                    ExplainSinkSchemaFor::Value => value_schema,
854                };
855
856                Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857                    sink_from: sink.from,
858                    json_schema: schema,
859                }))
860            }
861            _ => bail_unsupported!(
862                "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863            ),
864        },
865        _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
866    }
867}
868
869pub fn plan_explain_pushdown(
870    scx: &StatementContext,
871    statement: ExplainPushdownStatement<Aug>,
872    params: &Params,
873) -> Result<Plan, PlanError> {
874    scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875    let explainee = plan_explainee(scx, statement.explainee, params)?;
876    Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880    scx: &StatementContext,
881    statement: ExplainAnalyzeObjectStatement<Aug>,
882    params: &Params,
883) -> Result<Plan, PlanError> {
884    let explainee_name = statement
885        .explainee
886        .name()
887        .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888        .full_name_str();
889    let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891    match explainee {
892        plan::Explainee::Index(_index_id) => (),
893        plan::Explainee::MaterializedView(_item_id) => (),
894        _ => {
895            return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
896        }
897    };
898
899    // generate SQL query
900
901    /* WITH {CTEs}
902       SELECT REPEAT(' ', nesting * 2) || operator AS operator
903             {columns}
904        FROM      mz_introspection.mz_lir_mapping mlm
905             JOIN {from} USING (lir_id)
906             JOIN mz_introspection.mz_mappable_objects mo
907               ON (mlm.global_id = mo.global_id)
908       WHERE     mo.name = '{plan.explainee_name}'
909             AND {predicates}
910       ORDER BY lir_id DESC
911    */
912    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
913    let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
914    let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
915    let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
916    let mut order_by = vec!["mlm.lir_id DESC"];
917
918    match statement.properties {
919        ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
920            properties,
921            skew,
922        }) => {
923            let mut worker_id = None;
924            let mut seen_properties = BTreeSet::new();
925            for property in properties {
926                // handle each property only once (belt and suspenders)
927                if !seen_properties.insert(property) {
928                    continue;
929                }
930
931                match property {
932                    ExplainAnalyzeComputationProperty::Memory => {
933                        ctes.push((
934                            "summary_memory",
935                            r#"
936  SELECT mlm.global_id AS global_id,
937         mlm.lir_id AS lir_id,
938         SUM(mas.size) AS total_memory,
939         SUM(mas.records) AS total_records,
940         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
941         CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
942    FROM            mz_introspection.mz_lir_mapping mlm
943         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
944               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
945                 ON (mas.operator_id = valid_id)
946GROUP BY mlm.global_id, mlm.lir_id"#,
947                        ));
948                        from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
949
950                        if skew {
951                            ctes.push((
952                                "per_worker_memory",
953                                r#"
954  SELECT mlm.global_id AS global_id,
955         mlm.lir_id AS lir_id,
956         mas.worker_id AS worker_id,
957         SUM(mas.size) AS worker_memory,
958         SUM(mas.records) AS worker_records
959    FROM            mz_introspection.mz_lir_mapping mlm
960         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
961               JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
962                 ON (mas.operator_id = valid_id)
963GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
964                            ));
965                            from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
966
967                            if let Some(worker_id) = worker_id {
968                                predicates.push(format!("pwm.worker_id = {worker_id}"));
969                            } else {
970                                worker_id = Some("pwm.worker_id");
971                                columns.push("pwm.worker_id AS worker_id");
972                                order_by.push("worker_id");
973                            }
974
975                            columns.extend([
976                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
977                                "pg_size_pretty(pwm.worker_memory) AS worker_memory",
978                                "pg_size_pretty(sm.avg_memory) AS avg_memory",
979                                "pg_size_pretty(sm.total_memory) AS total_memory",
980                                "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
981                                "pwm.worker_records AS worker_records",
982                                "sm.avg_records AS avg_records",
983                                "sm.total_records AS total_records",
984                            ]);
985                        } else {
986                            columns.extend([
987                                "pg_size_pretty(sm.total_memory) AS total_memory",
988                                "sm.total_records AS total_records",
989                            ]);
990                        }
991                    }
992                    ExplainAnalyzeComputationProperty::Cpu => {
993                        ctes.push((
994                            "summary_cpu",
995                            r#"
996  SELECT mlm.global_id AS global_id,
997         mlm.lir_id AS lir_id,
998         SUM(mse.elapsed_ns) AS total_ns,
999         CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1000    FROM            mz_introspection.mz_lir_mapping mlm
1001         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1002               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1003                 ON (mse.id = valid_id)
1004GROUP BY mlm.global_id, mlm.lir_id"#,
1005                        ));
1006                        from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1007
1008                        if skew {
1009                            ctes.push((
1010                                "per_worker_cpu",
1011                                r#"
1012  SELECT mlm.global_id AS global_id,
1013         mlm.lir_id AS lir_id,
1014         mse.worker_id AS worker_id,
1015         SUM(mse.elapsed_ns) AS worker_ns
1016    FROM            mz_introspection.mz_lir_mapping mlm
1017         CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1018               JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1019                 ON (mse.id = valid_id)
1020GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1021                            ));
1022                            from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1023
1024                            if let Some(worker_id) = worker_id {
1025                                predicates.push(format!("pwc.worker_id = {worker_id}"));
1026                            } else {
1027                                worker_id = Some("pwc.worker_id");
1028                                columns.push("pwc.worker_id AS worker_id");
1029                                order_by.push("worker_id");
1030                            }
1031
1032                            columns.extend([
1033                                "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1034                                "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1035                                "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1036                            ]);
1037                        }
1038                        columns.push(
1039                            "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1040                        );
1041                    }
1042                }
1043            }
1044        }
1045        ExplainAnalyzeProperty::Hints => {
1046            columns.extend([
1047                "megsa.levels AS levels",
1048                "megsa.to_cut AS to_cut",
1049                "megsa.hint AS hint",
1050                "pg_size_pretty(megsa.savings) AS savings",
1051            ]);
1052            from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1053            "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1054             mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1055        }
1056    }
1057
1058    from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1059
1060    let ctes = if !ctes.is_empty() {
1061        format!(
1062            "WITH {}",
1063            separated(
1064                ",\n",
1065                ctes.iter()
1066                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1067            )
1068        )
1069    } else {
1070        String::new()
1071    };
1072    let columns = separated(", ", columns);
1073    let from = separated(" ", from);
1074    let predicates = separated(" AND ", predicates);
1075    let order_by = separated(", ", order_by);
1076    let query = format!(
1077        r#"{ctes}
1078SELECT {columns}
1079FROM {from}
1080WHERE {predicates}
1081ORDER BY {order_by}"#
1082    );
1083
1084    if statement.as_sql {
1085        let rows = vec![Row::pack_slice(&[Datum::String(
1086            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1087                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1088            })?,
1089        )])];
1090        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1091
1092        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1093    } else {
1094        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1095        show_select.plan()
1096    }
1097}
1098
1099pub fn plan_explain_analyze_cluster(
1100    scx: &StatementContext,
1101    statement: ExplainAnalyzeClusterStatement,
1102    _params: &Params,
1103) -> Result<Plan, PlanError> {
1104    // object string
1105    // worker_id uint64        (if           skew)
1106    // memory_ratio numeric    (if memory && skew)
1107    // worker_memory string    (if memory && skew)
1108    // avg_memory string       (if memory && skew)
1109    // total_memory string     (if memory)
1110    // records_ratio numeric   (if memory && skew)
1111    // worker_records          (if memory && skew)
1112    // avg_records numeric     (if memory && skew)
1113    // total_records numeric   (if memory)
1114    // cpu_ratio numeric       (if cpu    && skew)
1115    // worker_elapsed interval (if cpu    && skew)
1116    // avg_elapsed interval    (if cpu    && skew)
1117    // total_elapsed interval  (if cpu)
1118
1119    /* WITH {CTEs}
1120       SELECT mo.name AS object
1121             {columns}
1122        FROM mz_introspection.mz_mappable_objects mo
1123             {from}
1124       WHERE {predicates}
1125       ORDER BY {order_by}, mo.name DESC
1126    */
1127    let mut ctes = Vec::with_capacity(4); // max 2 per ExplainAnalyzeComputationProperty
1128    let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1129    let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1130    let mut predicates = vec![];
1131    let mut order_by = vec![];
1132
1133    let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1134    let mut worker_id = None;
1135    let mut seen_properties = BTreeSet::new();
1136    for property in properties {
1137        // handle each property only once (belt and suspenders)
1138        if !seen_properties.insert(property) {
1139            continue;
1140        }
1141
1142        match property {
1143            ExplainAnalyzeComputationProperty::Memory => {
1144                if skew {
1145                    let mut set_worker_id = false;
1146                    if let Some(worker_id) = worker_id {
1147                        // join condition if we're showing skew for more than one property
1148                        predicates.push(format!("om.worker_id = {worker_id}"));
1149                    } else {
1150                        worker_id = Some("om.worker_id");
1151                        columns.push("om.worker_id AS worker_id");
1152                        set_worker_id = true; // we'll add ourselves to `order_by` later
1153                    };
1154
1155                    // computes the average memory per LIR operator (for per operator ratios)
1156                    ctes.push((
1157                    "per_operator_memory_summary",
1158                    r#"
1159SELECT mlm.global_id AS global_id,
1160       mlm.lir_id AS lir_id,
1161       SUM(mas.size) AS total_memory,
1162       SUM(mas.records) AS total_records,
1163       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1164       CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1165FROM        mz_introspection.mz_lir_mapping mlm
1166 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1167       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1168         ON (mas.operator_id = valid_id)
1169GROUP BY mlm.global_id, mlm.lir_id"#,
1170                ));
1171
1172                    // computes the memory per worker in a per operator way
1173                    ctes.push((
1174                    "per_operator_memory_per_worker",
1175                    r#"
1176SELECT mlm.global_id AS global_id,
1177       mlm.lir_id AS lir_id,
1178       mas.worker_id AS worker_id,
1179       SUM(mas.size) AS worker_memory,
1180       SUM(mas.records) AS worker_records
1181FROM        mz_introspection.mz_lir_mapping mlm
1182 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1183       JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1184         ON (mas.operator_id = valid_id)
1185GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1186                    ));
1187
1188                    // computes memory ratios per worker per operator
1189                    ctes.push((
1190                    "per_operator_memory_ratios",
1191                    r#"
1192SELECT pompw.global_id AS global_id,
1193       pompw.lir_id AS lir_id,
1194       pompw.worker_id AS worker_id,
1195       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1196       CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1197  FROM      per_operator_memory_per_worker pompw
1198       JOIN per_operator_memory_summary poms
1199         USING (global_id, lir_id)
1200"#,
1201                    ));
1202
1203                    // summarizes each object, per worker
1204                    ctes.push((
1205                        "object_memory",
1206                        r#"
1207SELECT pompw.global_id AS global_id,
1208       pompw.worker_id AS worker_id,
1209       MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1210       MAX(pomr.records_ratio) AS max_operator_records_ratio,
1211       SUM(pompw.worker_memory) AS worker_memory,
1212       SUM(pompw.worker_records) AS worker_records
1213FROM        per_operator_memory_per_worker pompw
1214     JOIN   per_operator_memory_ratios pomr
1215     USING (global_id, worker_id, lir_id)
1216GROUP BY pompw.global_id, pompw.worker_id
1217"#,
1218                    ));
1219
1220                    // summarizes each worker
1221                    ctes.push(("object_average_memory", r#"
1222SELECT om.global_id AS global_id,
1223       SUM(om.worker_memory) AS total_memory,
1224       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1225       SUM(om.worker_records) AS total_records,
1226       CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1227  FROM object_memory om
1228GROUP BY om.global_id"#));
1229
1230                    from.push("LEFT JOIN object_memory om USING (global_id)");
1231                    from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1232
1233                    columns.extend([
1234                        "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1235                        "pg_size_pretty(om.worker_memory) AS worker_memory",
1236                        "pg_size_pretty(oam.avg_memory) AS avg_memory",
1237                        "pg_size_pretty(oam.total_memory) AS total_memory",
1238                        "om.max_operator_records_ratio AS max_operator_records_ratio",
1239                        "om.worker_records AS worker_records",
1240                        "oam.avg_records AS avg_records",
1241                        "oam.total_records AS total_records",
1242                    ]);
1243
1244                    order_by.extend([
1245                        "max_operator_memory_ratio DESC",
1246                        "max_operator_records_ratio DESC",
1247                        "om.worker_memory DESC",
1248                        "worker_records DESC",
1249                    ]);
1250
1251                    if set_worker_id {
1252                        order_by.push("worker_id");
1253                    }
1254                } else {
1255                    // no skew, so just compute totals
1256                    ctes.push((
1257                        "per_operator_memory_totals",
1258                        r#"
1259    SELECT mlm.global_id AS global_id,
1260           mlm.lir_id AS lir_id,
1261           SUM(mas.size) AS total_memory,
1262           SUM(mas.records) AS total_records
1263    FROM        mz_introspection.mz_lir_mapping mlm
1264     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1265           JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1266             ON (mas.operator_id = valid_id)
1267    GROUP BY mlm.global_id, mlm.lir_id"#,
1268                    ));
1269
1270                    ctes.push((
1271                        "object_memory_totals",
1272                        r#"
1273SELECT pomt.global_id AS global_id,
1274       SUM(pomt.total_memory) AS total_memory,
1275       SUM(pomt.total_records) AS total_records
1276FROM per_operator_memory_totals pomt
1277GROUP BY pomt.global_id
1278"#,
1279                    ));
1280
1281                    from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1282                    columns.extend([
1283                        "pg_size_pretty(omt.total_memory) AS total_memory",
1284                        "omt.total_records AS total_records",
1285                    ]);
1286                    order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1287                }
1288            }
1289            ExplainAnalyzeComputationProperty::Cpu => {
1290                if skew {
1291                    let mut set_worker_id = false;
1292                    if let Some(worker_id) = worker_id {
1293                        // join condition if we're showing skew for more than one property
1294                        predicates.push(format!("oc.worker_id = {worker_id}"));
1295                    } else {
1296                        worker_id = Some("oc.worker_id");
1297                        columns.push("oc.worker_id AS worker_id");
1298                        set_worker_id = true; // we'll add ourselves to `order_by` later
1299                    };
1300
1301                    // computes the average memory per LIR operator (for per operator ratios)
1302                    ctes.push((
1303    "per_operator_cpu_summary",
1304    r#"
1305SELECT mlm.global_id AS global_id,
1306       mlm.lir_id AS lir_id,
1307       SUM(mse.elapsed_ns) AS total_ns,
1308       CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1309FROM       mz_introspection.mz_lir_mapping mlm
1310CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1311      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1312        ON (mse.id = valid_id)
1313GROUP BY mlm.global_id, mlm.lir_id"#,
1314));
1315
1316                    // computes the CPU per worker in a per operator way
1317                    ctes.push((
1318                        "per_operator_cpu_per_worker",
1319                        r#"
1320SELECT mlm.global_id AS global_id,
1321       mlm.lir_id AS lir_id,
1322       mse.worker_id AS worker_id,
1323       SUM(mse.elapsed_ns) AS worker_ns
1324FROM       mz_introspection.mz_lir_mapping mlm
1325CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1326      JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1327        ON (mse.id = valid_id)
1328GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1329                    ));
1330
1331                    // computes CPU ratios per worker per operator
1332                    ctes.push((
1333                        "per_operator_cpu_ratios",
1334                        r#"
1335SELECT pocpw.global_id AS global_id,
1336       pocpw.lir_id AS lir_id,
1337       pocpw.worker_id AS worker_id,
1338       CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1339FROM      per_operator_cpu_per_worker pocpw
1340     JOIN per_operator_cpu_summary pocs
1341     USING (global_id, lir_id)
1342"#,
1343                    ));
1344
1345                    // summarizes each object, per worker
1346                    ctes.push((
1347                        "object_cpu",
1348                        r#"
1349SELECT pocpw.global_id AS global_id,
1350       pocpw.worker_id AS worker_id,
1351       MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1352       SUM(pocpw.worker_ns) AS worker_ns
1353FROM      per_operator_cpu_per_worker pocpw
1354     JOIN per_operator_cpu_ratios pomr
1355     USING (global_id, worker_id, lir_id)
1356GROUP BY pocpw.global_id, pocpw.worker_id
1357"#,
1358                    ));
1359
1360                    // summarizes each worker
1361                    ctes.push((
1362                        "object_average_cpu",
1363                        r#"
1364SELECT oc.global_id AS global_id,
1365       SUM(oc.worker_ns) AS total_ns,
1366       CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1367  FROM object_cpu oc
1368GROUP BY oc.global_id"#,));
1369
1370                    from.push("LEFT JOIN object_cpu oc USING (global_id)");
1371                    from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1372
1373                    columns.extend([
1374                        "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1375                        "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1376                        "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1377                        "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1378                    ]);
1379
1380                    order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1381
1382                    if set_worker_id {
1383                        order_by.push("worker_id");
1384                    }
1385                } else {
1386                    // no skew, so just compute totals
1387                    ctes.push((
1388                        "per_operator_cpu_totals",
1389                        r#"
1390    SELECT mlm.global_id AS global_id,
1391           mlm.lir_id AS lir_id,
1392           SUM(mse.elapsed_ns) AS total_ns
1393    FROM        mz_introspection.mz_lir_mapping mlm
1394     CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1395           JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1396             ON (mse.id = valid_id)
1397    GROUP BY mlm.global_id, mlm.lir_id"#,
1398                    ));
1399
1400                    ctes.push((
1401                        "object_cpu_totals",
1402                        r#"
1403SELECT poct.global_id AS global_id,
1404       SUM(poct.total_ns) AS total_ns
1405FROM per_operator_cpu_totals poct
1406GROUP BY poct.global_id
1407"#,
1408                    ));
1409
1410                    from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1411                    columns
1412                        .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1413                    order_by.extend(["total_elapsed DESC"]);
1414                }
1415            }
1416        }
1417    }
1418
1419    // generate SQL query text
1420    let ctes = if !ctes.is_empty() {
1421        format!(
1422            "WITH {}",
1423            separated(
1424                ",\n",
1425                ctes.iter()
1426                    .map(|(name, defn)| format!("{name} AS ({defn})"))
1427            )
1428        )
1429    } else {
1430        String::new()
1431    };
1432    let columns = separated(", ", columns);
1433    let from = separated(" ", from);
1434    let predicates = if !predicates.is_empty() {
1435        format!("WHERE {}", separated(" AND ", predicates))
1436    } else {
1437        String::new()
1438    };
1439    // add mo.name last, to break ties only
1440    order_by.push("mo.name DESC");
1441    let order_by = separated(", ", order_by);
1442    let query = format!(
1443        r#"{ctes}
1444SELECT {columns}
1445FROM {from}
1446{predicates}
1447ORDER BY {order_by}"#
1448    );
1449
1450    if statement.as_sql {
1451        let rows = vec![Row::pack_slice(&[Datum::String(
1452            &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1453                PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1454            })?,
1455        )])];
1456        let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1457
1458        Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1459    } else {
1460        let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1461        show_select.plan()
1462    }
1463}
1464
1465pub fn plan_explain_timestamp(
1466    scx: &StatementContext,
1467    explain: ExplainTimestampStatement<Aug>,
1468) -> Result<Plan, PlanError> {
1469    let (format, _verbose_syntax) = match explain.format() {
1470        mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1471        mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1472        mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1473        mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1474    };
1475
1476    let raw_plan = {
1477        let query::PlannedRootQuery {
1478            expr: raw_plan,
1479            desc: _,
1480            finishing: _,
1481            scope: _,
1482        } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1483        if raw_plan.contains_parameters()? {
1484            return Err(PlanError::ParameterNotAllowed(
1485                "EXPLAIN TIMESTAMP".to_string(),
1486            ));
1487        }
1488
1489        raw_plan
1490    };
1491    let when = query::plan_as_of(scx, explain.select.as_of)?;
1492
1493    Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1494        format,
1495        raw_plan,
1496        when,
1497    }))
1498}
1499
1500/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but
1501/// returns an [`MirRelationExpr`], which cannot include correlated expressions.
1502#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1503pub fn plan_query(
1504    scx: &StatementContext,
1505    query: Query<Aug>,
1506    params: &Params,
1507    lifetime: QueryLifetime,
1508) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1509    let query::PlannedRootQuery {
1510        mut expr,
1511        desc,
1512        finishing,
1513        scope,
1514    } = query::plan_root_query(scx, query, lifetime)?;
1515    expr.bind_parameters(scx, lifetime, params)?;
1516
1517    Ok(query::PlannedRootQuery {
1518        // No metrics passed! One more reason not to use this deprecated function.
1519        expr: expr.lower(scx.catalog.system_vars(), None)?,
1520        desc,
1521        finishing,
1522        scope,
1523    })
1524}
1525
1526generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1527
1528pub fn describe_subscribe(
1529    scx: &StatementContext,
1530    stmt: SubscribeStatement<Aug>,
1531) -> Result<StatementDesc, PlanError> {
1532    let relation_desc = match stmt.relation {
1533        SubscribeRelation::Name(name) => {
1534            let item = scx.get_item_by_resolved_name(&name)?;
1535            match item.relation_desc() {
1536                Some(desc) => desc.into_owned(),
1537                None => sql_bail!(
1538                    "'{}' cannot be subscribed to because it is a {}",
1539                    name.full_name_str(),
1540                    item.item_type(),
1541                ),
1542            }
1543        }
1544        SubscribeRelation::Query(query) => {
1545            let query::PlannedRootQuery { desc, .. } =
1546                query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1547            desc
1548        }
1549    };
1550    let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1551    let progress = progress.unwrap_or(false);
1552    let mut desc = RelationDesc::builder().with_column(
1553        "mz_timestamp",
1554        SqlScalarType::Numeric {
1555            max_scale: Some(NumericMaxScale::ZERO),
1556        }
1557        .nullable(false),
1558    );
1559    if progress {
1560        desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1561    }
1562
1563    let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1564    match stmt.output {
1565        SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1566            desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1567            for (name, mut ty) in relation_desc.into_iter() {
1568                if progress {
1569                    ty.nullable = true;
1570                }
1571                desc = desc.with_column(name, ty);
1572            }
1573        }
1574        SubscribeOutput::EnvelopeUpsert { key_columns }
1575        | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1576            desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1577            let key_columns = key_columns
1578                .into_iter()
1579                .map(normalize::column_name)
1580                .collect_vec();
1581            let mut before_values_desc = RelationDesc::builder();
1582            let mut after_values_desc = RelationDesc::builder();
1583
1584            // Add the key columns in the order that they're specified.
1585            for column_name in &key_columns {
1586                let mut column_ty = relation_desc
1587                    .get_by_name(column_name)
1588                    .map(|(_pos, ty)| ty.clone())
1589                    .ok_or_else(|| PlanError::UnknownColumn {
1590                        table: None,
1591                        column: column_name.clone(),
1592                        similar: Box::new([]),
1593                    })?;
1594                if progress {
1595                    column_ty.nullable = true;
1596                }
1597                desc = desc.with_column(column_name, column_ty);
1598            }
1599
1600            // Then add the remaining columns in the order from the original
1601            // table, filtering out the key columns since we added those above.
1602            for (mut name, mut ty) in relation_desc
1603                .into_iter()
1604                .filter(|(name, _ty)| !key_columns.contains(name))
1605            {
1606                ty.nullable = true;
1607                before_values_desc =
1608                    before_values_desc.with_column(format!("before_{}", name), ty.clone());
1609                if debezium {
1610                    name = format!("after_{}", name).into();
1611                }
1612                after_values_desc = after_values_desc.with_column(name, ty);
1613            }
1614
1615            if debezium {
1616                desc = desc.concat(before_values_desc);
1617            }
1618            desc = desc.concat(after_values_desc);
1619        }
1620    }
1621    Ok(StatementDesc::new(Some(desc.finish())))
1622}
1623
1624pub fn plan_subscribe(
1625    scx: &StatementContext,
1626    SubscribeStatement {
1627        relation,
1628        options,
1629        as_of,
1630        up_to,
1631        output,
1632    }: SubscribeStatement<Aug>,
1633    params: &Params,
1634    copy_to: Option<CopyFormat>,
1635) -> Result<Plan, PlanError> {
1636    let (from, desc, scope) = match relation {
1637        SubscribeRelation::Name(name) => {
1638            let item = scx.get_item_by_resolved_name(&name)?;
1639            let Some(desc) = item.relation_desc() else {
1640                sql_bail!(
1641                    "'{}' cannot be subscribed to because it is a {}",
1642                    name.full_name_str(),
1643                    item.item_type(),
1644                );
1645            };
1646            let item_name = match name {
1647                ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1648                _ => None,
1649            };
1650            let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1651            (
1652                SubscribeFrom::Id(item.global_id()),
1653                desc.into_owned(),
1654                scope,
1655            )
1656        }
1657        SubscribeRelation::Query(query) => {
1658            #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe
1659            let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1660            // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the
1661            // finishing should have already been turned into a `TopK` by
1662            // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`.
1663            assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1664                &query.finishing,
1665                query.desc.arity()
1666            ));
1667            let desc = query.desc.clone();
1668            (
1669                SubscribeFrom::Query {
1670                    expr: query.expr,
1671                    desc: query.desc,
1672                },
1673                desc,
1674                query.scope,
1675            )
1676        }
1677    };
1678
1679    let when = query::plan_as_of(scx, as_of)?;
1680    let up_to = up_to
1681        .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1682        .transpose()?;
1683
1684    let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1685    let ecx = ExprContext {
1686        qcx: &qcx,
1687        name: "",
1688        scope: &scope,
1689        relation_type: desc.typ(),
1690        allow_aggregates: false,
1691        allow_subqueries: true,
1692        allow_parameters: true,
1693        allow_windows: false,
1694    };
1695
1696    let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1697    let output = match output {
1698        SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1699        SubscribeOutput::EnvelopeUpsert { key_columns } => {
1700            let order_by = key_columns
1701                .iter()
1702                .map(|ident| OrderByExpr {
1703                    expr: Expr::Identifier(vec![ident.clone()]),
1704                    asc: None,
1705                    nulls_last: None,
1706                })
1707                .collect_vec();
1708            let (order_by, map_exprs) = query::plan_order_by_exprs(
1709                &ExprContext {
1710                    name: "ENVELOPE UPSERT KEY clause",
1711                    ..ecx
1712                },
1713                &order_by[..],
1714                &output_columns[..],
1715            )?;
1716            if !map_exprs.is_empty() {
1717                return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1718            }
1719            plan::SubscribeOutput::EnvelopeUpsert {
1720                order_by_keys: order_by,
1721            }
1722        }
1723        SubscribeOutput::EnvelopeDebezium { key_columns } => {
1724            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1725            let order_by = key_columns
1726                .iter()
1727                .map(|ident| OrderByExpr {
1728                    expr: Expr::Identifier(vec![ident.clone()]),
1729                    asc: None,
1730                    nulls_last: None,
1731                })
1732                .collect_vec();
1733            let (order_by, map_exprs) = query::plan_order_by_exprs(
1734                &ExprContext {
1735                    name: "ENVELOPE DEBEZIUM KEY clause",
1736                    ..ecx
1737                },
1738                &order_by[..],
1739                &output_columns[..],
1740            )?;
1741            if !map_exprs.is_empty() {
1742                return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1743            }
1744            plan::SubscribeOutput::EnvelopeDebezium {
1745                order_by_keys: order_by,
1746            }
1747        }
1748        SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1749            scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1750            let mz_diff = "mz_diff".into();
1751            let output_columns = std::iter::once((0, &mz_diff))
1752                .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1753                .collect_vec();
1754            match query::plan_order_by_exprs(
1755                &ExprContext {
1756                    name: "WITHIN TIMESTAMP ORDER BY clause",
1757                    ..ecx
1758                },
1759                &order_by[..],
1760                &output_columns[..],
1761            ) {
1762                Err(PlanError::UnknownColumn {
1763                    table: None,
1764                    column,
1765                    similar: _,
1766                }) if &column == &mz_diff => {
1767                    // mz_diff is being used in an expression. Since mz_diff isn't part of the table
1768                    // it looks like an unknown column. Instead, return a better error
1769                    return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1770                }
1771                Err(e) => return Err(e),
1772                Ok((order_by, map_exprs)) => {
1773                    if !map_exprs.is_empty() {
1774                        return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1775                    }
1776
1777                    plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1778                }
1779            }
1780        }
1781    };
1782
1783    let SubscribeOptionExtracted {
1784        progress, snapshot, ..
1785    } = options.try_into()?;
1786    Ok(Plan::Subscribe(SubscribePlan {
1787        from,
1788        when,
1789        up_to,
1790        with_snapshot: snapshot.unwrap_or(true),
1791        copy_to,
1792        emit_progress: progress.unwrap_or(false),
1793        output,
1794    }))
1795}
1796
1797pub fn describe_copy_from_table(
1798    scx: &StatementContext,
1799    table_name: <Aug as AstInfo>::ItemName,
1800    columns: Vec<Ident>,
1801) -> Result<StatementDesc, PlanError> {
1802    let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1803    Ok(StatementDesc::new(Some(desc)))
1804}
1805
1806pub fn describe_copy_item(
1807    scx: &StatementContext,
1808    object_name: <Aug as AstInfo>::ItemName,
1809    columns: Vec<Ident>,
1810) -> Result<StatementDesc, PlanError> {
1811    let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1812    Ok(StatementDesc::new(Some(desc)))
1813}
1814
1815pub fn describe_copy(
1816    scx: &StatementContext,
1817    CopyStatement {
1818        relation,
1819        direction,
1820        ..
1821    }: CopyStatement<Aug>,
1822) -> Result<StatementDesc, PlanError> {
1823    Ok(match (relation, direction) {
1824        (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1825            describe_copy_item(scx, name, columns)?
1826        }
1827        (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1828            describe_copy_from_table(scx, name, columns)?
1829        }
1830        (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1831        (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1832    }
1833    .with_is_copy())
1834}
1835
1836fn plan_copy_to_expr(
1837    scx: &StatementContext,
1838    select_plan: SelectPlan,
1839    desc: RelationDesc,
1840    to: &Expr<Aug>,
1841    format: CopyFormat,
1842    options: CopyOptionExtracted,
1843) -> Result<Plan, PlanError> {
1844    let conn_id = match options.aws_connection {
1845        Some(conn_id) => CatalogItemId::from(conn_id),
1846        None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1847    };
1848    let connection = scx.get_item(&conn_id).connection()?;
1849
1850    match connection {
1851        mz_storage_types::connections::Connection::Aws(_) => {}
1852        _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1853    }
1854
1855    let format = match format {
1856        CopyFormat::Csv => {
1857            let quote = extract_byte_param_value(options.quote, "quote")?;
1858            let escape = extract_byte_param_value(options.escape, "escape")?;
1859            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1860            S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1861                CopyCsvFormatParams::try_new(
1862                    delimiter,
1863                    quote,
1864                    escape,
1865                    options.header,
1866                    options.null,
1867                )
1868                .map_err(|e| sql_err!("{}", e))?,
1869            ))
1870        }
1871        CopyFormat::Parquet => {
1872            // Validate that the output desc can be formatted as parquet
1873            ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1874            S3SinkFormat::Parquet
1875        }
1876        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1877        CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1878    };
1879
1880    // Converting the to expr to a HirScalarExpr
1881    let mut to_expr = to.clone();
1882    transform_ast::transform(scx, &mut to_expr)?;
1883    let relation_type = RelationDesc::empty();
1884    let ecx = &ExprContext {
1885        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1886        name: "COPY TO target",
1887        scope: &Scope::empty(),
1888        relation_type: relation_type.typ(),
1889        allow_aggregates: false,
1890        allow_subqueries: false,
1891        allow_parameters: false,
1892        allow_windows: false,
1893    };
1894
1895    let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1896
1897    if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1898        sql_bail!(
1899            "MAX FILE SIZE cannot be less than {}",
1900            MIN_S3_SINK_FILE_SIZE
1901        );
1902    }
1903    if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1904        sql_bail!(
1905            "MAX FILE SIZE cannot be greater than {}",
1906            MAX_S3_SINK_FILE_SIZE
1907        );
1908    }
1909
1910    Ok(Plan::CopyTo(CopyToPlan {
1911        select_plan,
1912        desc,
1913        to,
1914        connection: connection.to_owned(),
1915        connection_id: conn_id,
1916        format,
1917        max_file_size: options.max_file_size.as_bytes(),
1918    }))
1919}
1920
1921fn plan_copy_from(
1922    scx: &StatementContext,
1923    target: &CopyTarget<Aug>,
1924    table_name: ResolvedItemName,
1925    columns: Vec<Ident>,
1926    format: CopyFormat,
1927    options: CopyOptionExtracted,
1928) -> Result<Plan, PlanError> {
1929    fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1930        match option {
1931            Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1932            None => Ok(()),
1933        }
1934    }
1935
1936    let source = match target {
1937        CopyTarget::Stdin => CopyFromSource::Stdin,
1938        CopyTarget::Expr(from) => {
1939            scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1940
1941            // Converting the expr to an HirScalarExpr
1942            let mut from_expr = from.clone();
1943            transform_ast::transform(scx, &mut from_expr)?;
1944            let relation_type = RelationDesc::empty();
1945            let ecx = &ExprContext {
1946                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1947                name: "COPY FROM target",
1948                scope: &Scope::empty(),
1949                relation_type: relation_type.typ(),
1950                allow_aggregates: false,
1951                allow_subqueries: false,
1952                allow_parameters: false,
1953                allow_windows: false,
1954            };
1955            let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1956
1957            match options.aws_connection {
1958                Some(conn_id) => {
1959                    let conn_id = CatalogItemId::from(conn_id);
1960
1961                    // Validate the connection type is one we expect.
1962                    let connection = match scx.get_item(&conn_id).connection()? {
1963                        mz_storage_types::connections::Connection::Aws(conn) => conn,
1964                        _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1965                    };
1966
1967                    CopyFromSource::AwsS3 {
1968                        uri: from,
1969                        connection,
1970                        connection_id: conn_id,
1971                    }
1972                }
1973                None => CopyFromSource::Url(from),
1974            }
1975        }
1976        CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1977    };
1978
1979    let params = match format {
1980        CopyFormat::Text => {
1981            only_available_with_csv(options.quote, "quote")?;
1982            only_available_with_csv(options.escape, "escape")?;
1983            only_available_with_csv(options.header, "HEADER")?;
1984            let delimiter =
1985                extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1986            let null = match options.null {
1987                Some(null) => Cow::from(null),
1988                None => Cow::from("\\N"),
1989            };
1990            CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1991        }
1992        CopyFormat::Csv => {
1993            let quote = extract_byte_param_value(options.quote, "quote")?;
1994            let escape = extract_byte_param_value(options.escape, "escape")?;
1995            let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1996            CopyFormatParams::Csv(
1997                CopyCsvFormatParams::try_new(
1998                    delimiter,
1999                    quote,
2000                    escape,
2001                    options.header,
2002                    options.null,
2003                )
2004                .map_err(|e| sql_err!("{}", e))?,
2005            )
2006        }
2007        CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2008        CopyFormat::Parquet => CopyFormatParams::Parquet,
2009    };
2010
2011    let filter = match (options.files, options.pattern) {
2012        (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2013        (Some(files), None) => Some(CopyFromFilter::Files(files)),
2014        (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2015        (None, None) => None,
2016    };
2017
2018    if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2019        bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2020    }
2021
2022    let table_name_string = table_name.full_name_str();
2023
2024    let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2025
2026    let Some(mfp) = maybe_mfp else {
2027        sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2028    };
2029
2030    Ok(Plan::CopyFrom(CopyFromPlan {
2031        target_id: id,
2032        target_name: table_name_string,
2033        source,
2034        columns,
2035        source_desc,
2036        mfp,
2037        params,
2038        filter,
2039    }))
2040}
2041
2042fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2043    match v {
2044        Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2045        Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2046        None => Ok(None),
2047    }
2048}
2049
2050generate_extracted_config!(
2051    CopyOption,
2052    (Format, String),
2053    (Delimiter, String),
2054    (Null, String),
2055    (Escape, String),
2056    (Quote, String),
2057    (Header, bool),
2058    (AwsConnection, with_options::Object),
2059    (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2060    (Files, Vec<String>),
2061    (Pattern, String)
2062);
2063
2064pub fn plan_copy(
2065    scx: &StatementContext,
2066    CopyStatement {
2067        relation,
2068        direction,
2069        target,
2070        options,
2071    }: CopyStatement<Aug>,
2072) -> Result<Plan, PlanError> {
2073    let options = CopyOptionExtracted::try_from(options)?;
2074    // Parse any user-provided FORMAT option. If not provided, will default to
2075    // Text for COPY TO STDOUT and COPY FROM STDIN, but will error for COPY TO <expr>.
2076    let format = options
2077        .format
2078        .as_ref()
2079        .map(|format| match format.to_lowercase().as_str() {
2080            "text" => Ok(CopyFormat::Text),
2081            "csv" => Ok(CopyFormat::Csv),
2082            "binary" => Ok(CopyFormat::Binary),
2083            "parquet" => Ok(CopyFormat::Parquet),
2084            _ => sql_bail!("unknown FORMAT: {}", format),
2085        })
2086        .transpose()?;
2087
2088    match (&direction, &target) {
2089        (CopyDirection::To, CopyTarget::Stdout) => {
2090            if options.delimiter.is_some() {
2091                sql_bail!("COPY TO does not support DELIMITER option yet");
2092            }
2093            if options.quote.is_some() {
2094                sql_bail!("COPY TO does not support QUOTE option yet");
2095            }
2096            if options.null.is_some() {
2097                sql_bail!("COPY TO does not support NULL option yet");
2098            }
2099            match relation {
2100                CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2101                CopyRelation::Select(stmt) => Ok(plan_select(
2102                    scx,
2103                    stmt,
2104                    &Params::empty(),
2105                    Some(format.unwrap_or(CopyFormat::Text)),
2106                )?),
2107                CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2108                    scx,
2109                    stmt,
2110                    &Params::empty(),
2111                    Some(format.unwrap_or(CopyFormat::Text)),
2112                )?),
2113            }
2114        }
2115        (CopyDirection::From, target) => match relation {
2116            CopyRelation::Named { name, columns } => plan_copy_from(
2117                scx,
2118                target,
2119                name,
2120                columns,
2121                format.unwrap_or(CopyFormat::Text),
2122                options,
2123            ),
2124            _ => sql_bail!("COPY FROM {} not supported", target),
2125        },
2126        (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2127            // System users are always allowed to use this feature, even when
2128            // the flag is disabled, so that we can dogfood for analytics in
2129            // production environments. The feature is stable enough that we're
2130            // not worried about it crashing.
2131            if !scx.catalog.active_role_id().is_system() {
2132                scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2133            }
2134
2135            let format = match format {
2136                Some(inner) => inner,
2137                _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2138            };
2139
2140            let stmt = match relation {
2141                CopyRelation::Named { name, columns } => {
2142                    if !columns.is_empty() {
2143                        // TODO(mouli): Add support for this
2144                        sql_bail!(
2145                            "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2146                        );
2147                    }
2148                    // Generate a synthetic SELECT query that just gets the table
2149                    let query = Query {
2150                        ctes: CteBlock::empty(),
2151                        body: SetExpr::Table(name),
2152                        order_by: vec![],
2153                        limit: None,
2154                        offset: None,
2155                    };
2156                    SelectStatement { query, as_of: None }
2157                }
2158                CopyRelation::Select(stmt) => {
2159                    if !stmt.query.order_by.is_empty() {
2160                        sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2161                    }
2162                    stmt
2163                }
2164                _ => sql_bail!("COPY {} {} not supported", direction, target),
2165            };
2166
2167            let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2168            plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2169        }
2170        _ => sql_bail!("COPY {} {} not supported", direction, target),
2171    }
2172}