1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37 SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50 UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{
71 self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
72};
73
74pub fn describe_insert(
81 scx: &StatementContext,
82 InsertStatement {
83 table_name,
84 columns,
85 source,
86 returning,
87 }: InsertStatement<Aug>,
88) -> Result<StatementDesc, PlanError> {
89 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
90 let desc = if returning.expr.is_empty() {
91 None
92 } else {
93 Some(returning.desc)
94 };
95 Ok(StatementDesc::new(desc))
96}
97
98pub fn plan_insert(
99 scx: &StatementContext,
100 InsertStatement {
101 table_name,
102 columns,
103 source,
104 returning,
105 }: InsertStatement<Aug>,
106 params: &Params,
107) -> Result<Plan, PlanError> {
108 let (id, mut expr, returning) =
109 query::plan_insert_query(scx, table_name, columns, source, returning)?;
110 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
111 let returning = returning
112 .expr
113 .into_iter()
114 .map(|mut expr| {
115 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
116 expr.lower_uncorrelated(scx.catalog.system_vars())
117 })
118 .collect::<Result<Vec<_>, _>>()?;
119
120 Ok(Plan::Insert(InsertPlan {
121 id,
122 values: expr,
123 returning,
124 }))
125}
126
127pub fn describe_delete(
128 scx: &StatementContext,
129 stmt: DeleteStatement<Aug>,
130) -> Result<StatementDesc, PlanError> {
131 query::plan_delete_query(scx, stmt)?;
132 Ok(StatementDesc::new(None))
133}
134
135pub fn plan_delete(
136 scx: &StatementContext,
137 stmt: DeleteStatement<Aug>,
138 params: &Params,
139) -> Result<Plan, PlanError> {
140 let rtw_plan = query::plan_delete_query(scx, stmt)?;
141 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
142}
143
144pub fn describe_update(
145 scx: &StatementContext,
146 stmt: UpdateStatement<Aug>,
147) -> Result<StatementDesc, PlanError> {
148 query::plan_update_query(scx, stmt)?;
149 Ok(StatementDesc::new(None))
150}
151
152pub fn plan_update(
153 scx: &StatementContext,
154 stmt: UpdateStatement<Aug>,
155 params: &Params,
156) -> Result<Plan, PlanError> {
157 let rtw_plan = query::plan_update_query(scx, stmt)?;
158 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
159}
160
161pub fn plan_read_then_write(
162 scx: &StatementContext,
163 kind: MutationKind,
164 params: &Params,
165 query::ReadThenWritePlan {
166 id,
167 mut selection,
168 finishing,
169 assignments,
170 }: query::ReadThenWritePlan,
171) -> Result<Plan, PlanError> {
172 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
173 let mut assignments_outer = BTreeMap::new();
174 for (idx, mut set) in assignments {
175 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
176 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
177 assignments_outer.insert(idx, set);
178 }
179
180 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
181 id,
182 selection,
183 finishing,
184 assignments: assignments_outer,
185 kind,
186 returning: Vec::new(),
187 }))
188}
189
190pub fn describe_select(
191 scx: &StatementContext,
192 stmt: SelectStatement<Aug>,
193) -> Result<StatementDesc, PlanError> {
194 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
195 return Ok(StatementDesc::new(Some(desc)));
196 }
197
198 let query::PlannedRootQuery { desc, .. } =
199 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
200 Ok(StatementDesc::new(Some(desc)))
201}
202
203pub fn plan_select(
204 scx: &StatementContext,
205 select: SelectStatement<Aug>,
206 params: &Params,
207 copy_to: Option<CopyFormat>,
208) -> Result<Plan, PlanError> {
209 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
210 return Ok(Plan::SideEffectingFunc(f));
211 }
212
213 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
214 Ok(Plan::Select(plan))
215}
216
217fn plan_select_inner(
218 scx: &StatementContext,
219 select: SelectStatement<Aug>,
220 params: &Params,
221 copy_to: Option<CopyFormat>,
222) -> Result<(SelectPlan, RelationDesc), PlanError> {
223 let when = query::plan_as_of(scx, select.as_of.clone())?;
224 let lifetime = QueryLifetime::OneShot;
225 let query::PlannedRootQuery {
226 mut expr,
227 desc,
228 finishing,
229 scope: _,
230 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
231 expr.bind_parameters(scx, lifetime, params)?;
232
233 expr.try_visit_mut_pre(&mut |expr| {
236 if let HirRelationExpr::TopK { offset, .. } = expr {
237 let offset_value = offset_into_value(offset.take())?;
238 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
239 }
240 Ok::<(), PlanError>(())
241 })?;
242 let limit = match finishing.limit {
252 None => None,
253 Some(mut limit) => {
254 limit.bind_parameters(scx, lifetime, params)?;
255 let Some(limit) = limit.as_literal() else {
257 sql_bail!(
258 "Top-level LIMIT must be a constant expression, got {}",
259 limit
260 )
261 };
262 match limit {
263 Datum::Null => None,
264 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
265 _ => {
266 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
267 sql_bail!("LIMIT must be a non-negative INT or NULL")
268 }
269 }
270 }
271 };
272 let offset = {
273 let mut offset = finishing.offset.clone();
274 offset.bind_parameters(scx, lifetime, params)?;
275 let offset = offset_into_value(offset.take())?;
276 offset
277 .try_into()
278 .expect("checked in offset_into_value that it is not negative")
279 };
280
281 if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
285 && select.as_of.is_some()
286 && expr.contains_unmaterializable_except_temporal()?
287 {
288 bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
289 }
290
291 let plan = SelectPlan {
292 source: expr,
293 when,
294 finishing: RowSetFinishing {
295 limit,
296 offset,
297 project: finishing.project,
298 order_by: finishing.order_by,
299 },
300 copy_to,
301 select: Some(Box::new(select)),
302 };
303
304 Ok((plan, desc))
305}
306
307pub fn describe_explain_plan(
308 scx: &StatementContext,
309 explain: ExplainPlanStatement<Aug>,
310) -> Result<StatementDesc, PlanError> {
311 let mut relation_desc = RelationDesc::builder();
312
313 match explain.stage() {
314 ExplainStage::RawPlan => {
315 let name = "Raw Plan";
316 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317 }
318 ExplainStage::DecorrelatedPlan => {
319 let name = "Decorrelated Plan";
320 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321 }
322 ExplainStage::LocalPlan => {
323 let name = "Locally Optimized Plan";
324 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
325 }
326 ExplainStage::GlobalPlan => {
327 let name = "Optimized Plan";
328 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329 }
330 ExplainStage::PhysicalPlan => {
331 let name = "Physical Plan";
332 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
333 }
334 ExplainStage::Trace => {
335 relation_desc = relation_desc
336 .with_column("Time", SqlScalarType::UInt64.nullable(false))
337 .with_column("Path", SqlScalarType::String.nullable(false))
338 .with_column("Plan", SqlScalarType::String.nullable(false));
339 }
340 ExplainStage::PlanInsights => {
341 let name = "Plan Insights";
342 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
343 }
344 };
345 let relation_desc = relation_desc.finish();
346
347 Ok(
348 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
349 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
350 _ => vec![],
351 }),
352 )
353}
354
355pub fn describe_explain_pushdown(
356 scx: &StatementContext,
357 statement: ExplainPushdownStatement<Aug>,
358) -> Result<StatementDesc, PlanError> {
359 let relation_desc = RelationDesc::builder()
360 .with_column("Source", SqlScalarType::String.nullable(false))
361 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
362 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
363 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
364 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
365 .finish();
366
367 Ok(
368 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
369 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
370 _ => vec![],
371 }),
372 )
373}
374
375pub fn describe_explain_analyze_object(
376 _scx: &StatementContext,
377 statement: ExplainAnalyzeObjectStatement<Aug>,
378) -> Result<StatementDesc, PlanError> {
379 if statement.as_sql {
380 let relation_desc = RelationDesc::builder()
381 .with_column("SQL", SqlScalarType::String.nullable(false))
382 .finish();
383 return Ok(StatementDesc::new(Some(relation_desc)));
384 }
385
386 match statement.properties {
387 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
388 properties,
389 skew,
390 }) => {
391 let mut relation_desc = RelationDesc::builder()
392 .with_column("operator", SqlScalarType::String.nullable(false));
393
394 if skew {
395 relation_desc =
396 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
397 }
398
399 let mut seen_properties = BTreeSet::new();
400 for property in properties {
401 if !seen_properties.insert(property) {
403 continue;
404 }
405
406 match property {
407 ExplainAnalyzeComputationProperty::Memory if skew => {
408 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
409 relation_desc = relation_desc
410 .with_column("memory_ratio", numeric.clone())
411 .with_column("worker_memory", SqlScalarType::String.nullable(true))
412 .with_column("avg_memory", SqlScalarType::String.nullable(true))
413 .with_column("total_memory", SqlScalarType::String.nullable(true))
414 .with_column("records_ratio", numeric.clone())
415 .with_column("worker_records", numeric.clone())
416 .with_column("avg_records", numeric.clone())
417 .with_column("total_records", numeric);
418 }
419 ExplainAnalyzeComputationProperty::Memory => {
420 relation_desc = relation_desc
421 .with_column("total_memory", SqlScalarType::String.nullable(true))
422 .with_column(
423 "total_records",
424 SqlScalarType::Numeric { max_scale: None }.nullable(true),
425 );
426 }
427 ExplainAnalyzeComputationProperty::Cpu => {
428 if skew {
429 relation_desc = relation_desc
430 .with_column(
431 "cpu_ratio",
432 SqlScalarType::Numeric { max_scale: None }.nullable(true),
433 )
434 .with_column(
435 "worker_elapsed",
436 SqlScalarType::Interval.nullable(true),
437 )
438 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
439 }
440 relation_desc = relation_desc
441 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
442 }
443 }
444 }
445
446 let relation_desc = relation_desc.finish();
447 Ok(StatementDesc::new(Some(relation_desc)))
448 }
449 ExplainAnalyzeProperty::Hints => {
450 let relation_desc = RelationDesc::builder()
451 .with_column("operator", SqlScalarType::String.nullable(true))
452 .with_column("levels", SqlScalarType::Int64.nullable(true))
453 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
454 .with_column("hint", SqlScalarType::Float64.nullable(true))
455 .with_column("savings", SqlScalarType::String.nullable(true))
456 .finish();
457 Ok(StatementDesc::new(Some(relation_desc)))
458 }
459 }
460}
461
462pub fn describe_explain_analyze_cluster(
463 _scx: &StatementContext,
464 statement: ExplainAnalyzeClusterStatement,
465) -> Result<StatementDesc, PlanError> {
466 if statement.as_sql {
467 let relation_desc = RelationDesc::builder()
468 .with_column("SQL", SqlScalarType::String.nullable(false))
469 .finish();
470 return Ok(StatementDesc::new(Some(relation_desc)));
471 }
472
473 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
474
475 let mut relation_desc = RelationDesc::builder()
476 .with_column("object", SqlScalarType::String.nullable(false))
477 .with_column("global_id", SqlScalarType::String.nullable(false));
478
479 if skew {
480 relation_desc =
481 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
482 }
483
484 let mut seen_properties = BTreeSet::new();
485 for property in properties {
486 if !seen_properties.insert(property) {
488 continue;
489 }
490
491 match property {
492 ExplainAnalyzeComputationProperty::Memory if skew => {
493 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
494 relation_desc = relation_desc
495 .with_column("max_operator_memory_ratio", numeric.clone())
496 .with_column("worker_memory", SqlScalarType::String.nullable(true))
497 .with_column("avg_memory", SqlScalarType::String.nullable(true))
498 .with_column("total_memory", SqlScalarType::String.nullable(true))
499 .with_column("max_operator_records_ratio", numeric.clone())
500 .with_column("worker_records", numeric.clone())
501 .with_column("avg_records", numeric.clone())
502 .with_column("total_records", numeric);
503 }
504 ExplainAnalyzeComputationProperty::Memory => {
505 relation_desc = relation_desc
506 .with_column("total_memory", SqlScalarType::String.nullable(true))
507 .with_column(
508 "total_records",
509 SqlScalarType::Numeric { max_scale: None }.nullable(true),
510 );
511 }
512 ExplainAnalyzeComputationProperty::Cpu if skew => {
513 relation_desc = relation_desc
514 .with_column(
515 "max_operator_cpu_ratio",
516 SqlScalarType::Numeric { max_scale: None }.nullable(true),
517 )
518 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
519 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
520 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
521 }
522 ExplainAnalyzeComputationProperty::Cpu => {
523 relation_desc = relation_desc
524 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
525 }
526 }
527 }
528
529 Ok(StatementDesc::new(Some(relation_desc.finish())))
530}
531
532pub fn describe_explain_timestamp(
533 scx: &StatementContext,
534 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536 let relation_desc = RelationDesc::builder()
537 .with_column("Timestamp", SqlScalarType::String.nullable(false))
538 .finish();
539
540 Ok(StatementDesc::new(Some(relation_desc))
541 .with_params(describe_select(scx, select)?.param_types))
542}
543
544pub fn describe_explain_schema(
545 _: &StatementContext,
546 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
547) -> Result<StatementDesc, PlanError> {
548 let relation_desc = RelationDesc::builder()
549 .with_column("Schema", SqlScalarType::String.nullable(false))
550 .finish();
551 Ok(StatementDesc::new(Some(relation_desc)))
552}
553
554generate_extracted_config!(
563 ExplainPlanOption,
564 (Arity, Option<bool>, Default(None)),
565 (Cardinality, bool, Default(false)),
566 (ColumnNames, bool, Default(false)),
567 (FilterPushdown, Option<bool>, Default(None)),
568 (HumanizedExpressions, Option<bool>, Default(None)),
569 (JoinImplementations, bool, Default(false)),
570 (Keys, bool, Default(false)),
571 (LinearChains, bool, Default(false)),
572 (NoFastPath, bool, Default(false)),
573 (NonNegative, bool, Default(false)),
574 (NoNotices, bool, Default(false)),
575 (NodeIdentifiers, bool, Default(false)),
576 (Raw, bool, Default(false)),
577 (RawPlans, bool, Default(false)),
578 (RawSyntax, bool, Default(false)),
579 (Redacted, bool, Default(false)),
580 (SubtreeSize, bool, Default(false)),
581 (Timing, bool, Default(false)),
582 (Types, bool, Default(false)),
583 (Equivalences, bool, Default(false)),
584 (ReoptimizeImportedViews, Option<bool>, Default(None)),
585 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
586 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
587 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
588 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
589 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
590 (
591 EnableProjectionPushdownAfterRelationCse,
592 Option<bool>,
593 Default(None)
594 )
595);
596
597impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
598 type Error = PlanError;
599
600 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
601 if v.raw {
604 v.raw_plans = true;
605 v.raw_syntax = true;
606 }
607
608 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
611
612 Ok(ExplainConfig {
613 arity: v.arity.unwrap_or(enable_on_prod),
614 cardinality: v.cardinality,
615 column_names: v.column_names,
616 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
617 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
618 join_impls: v.join_implementations,
619 keys: v.keys,
620 linear_chains: !v.raw_plans && v.linear_chains,
621 no_fast_path: v.no_fast_path,
622 no_notices: v.no_notices,
623 node_ids: v.node_identifiers,
624 non_negative: v.non_negative,
625 raw_plans: v.raw_plans,
626 raw_syntax: v.raw_syntax,
627 verbose_syntax: false,
628 redacted: v.redacted,
629 subtree_size: v.subtree_size,
630 equivalences: v.equivalences,
631 timing: v.timing,
632 types: v.types,
633 features: OptimizerFeatureOverrides {
635 enable_guard_subquery_tablefunc: Default::default(),
636 enable_eager_delta_joins: v.enable_eager_delta_joins,
637 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
638 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
639 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
640 enable_consolidate_after_union_negate: Default::default(),
641 enable_reduce_mfp_fusion: Default::default(),
642 enable_cardinality_estimates: Default::default(),
643 persist_fast_path_limit: Default::default(),
644 reoptimize_imported_views: v.reoptimize_imported_views,
645 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
646 enable_projection_pushdown_after_relation_cse: v
647 .enable_projection_pushdown_after_relation_cse,
648 enable_less_reduce_in_eqprop: Default::default(),
649 enable_dequadratic_eqprop_map: Default::default(),
650 enable_eq_classes_withholding_errors: Default::default(),
651 enable_fast_path_plan_insights: Default::default(),
652 enable_cast_elimination: Default::default(),
653 },
654 })
655 }
656}
657
658fn plan_explainee(
659 scx: &StatementContext,
660 explainee: Explainee<Aug>,
661 params: &Params,
662) -> Result<plan::Explainee, PlanError> {
663 use crate::plan::ExplaineeStatement;
664
665 let is_replan = matches!(
666 explainee,
667 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
668 );
669
670 let explainee = match explainee {
671 Explainee::View(name) | Explainee::ReplanView(name) => {
672 let item = scx.get_item_by_resolved_name(&name)?;
673 let item_type = item.item_type();
674 if item_type != CatalogItemType::View {
675 sql_bail!("Expected {name} to be a view, not a {item_type}");
676 }
677 match is_replan {
678 true => crate::plan::Explainee::ReplanView(item.id()),
679 false => crate::plan::Explainee::View(item.id()),
680 }
681 }
682 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
683 let item = scx.get_item_by_resolved_name(&name)?;
684 let item_type = item.item_type();
685 if item_type != CatalogItemType::MaterializedView {
686 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
687 }
688 match is_replan {
689 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
690 false => crate::plan::Explainee::MaterializedView(item.id()),
691 }
692 }
693 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
694 let item = scx.get_item_by_resolved_name(&name)?;
695 let item_type = item.item_type();
696 if item_type != CatalogItemType::Index {
697 sql_bail!("Expected {name} to be an index, not a {item_type}");
698 }
699 match is_replan {
700 true => crate::plan::Explainee::ReplanIndex(item.id()),
701 false => crate::plan::Explainee::Index(item.id()),
702 }
703 }
704 Explainee::Select(select, broken) => {
705 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
706 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
707 }
708 Explainee::CreateView(mut stmt, broken) => {
709 if stmt.if_exists != IfExistsBehavior::Skip {
710 stmt.if_exists = IfExistsBehavior::Skip;
715 } else {
716 sql_bail!(
717 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
718 (the behavior is implied within the scope of an enclosing EXPLAIN)"
719 );
720 }
721
722 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
723 sql_bail!("expected CreateViewPlan plan");
724 };
725
726 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
727 }
728 Explainee::CreateMaterializedView(mut stmt, broken) => {
729 if stmt.if_exists != IfExistsBehavior::Skip {
730 stmt.if_exists = IfExistsBehavior::Skip;
735 } else {
736 sql_bail!(
737 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
738 (the behavior is implied within the scope of an enclosing EXPLAIN)"
739 );
740 }
741
742 let Plan::CreateMaterializedView(plan) =
743 ddl::plan_create_materialized_view(scx, *stmt)?
744 else {
745 sql_bail!("expected CreateMaterializedViewPlan plan");
746 };
747
748 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
749 broken,
750 plan,
751 })
752 }
753 Explainee::CreateIndex(mut stmt, broken) => {
754 if !stmt.if_not_exists {
755 stmt.if_not_exists = true;
758 } else {
759 sql_bail!(
760 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
761 (the behavior is implied within the scope of an enclosing EXPLAIN)"
762 );
763 }
764
765 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
766 sql_bail!("expected CreateIndexPlan plan");
767 };
768
769 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
770 }
771 Explainee::Subscribe(stmt, broken) => {
772 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
773 sql_bail!("expected SubscribePlan");
774 };
775 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
776 }
777 };
778
779 Ok(explainee)
780}
781
782pub fn plan_explain_plan(
783 scx: &StatementContext,
784 explain: ExplainPlanStatement<Aug>,
785 params: &Params,
786) -> Result<Plan, PlanError> {
787 let (format, verbose_syntax) = match explain.format() {
788 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
789 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
790 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
791 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
792 };
793 let stage = explain.stage();
794
795 let mut config = {
797 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
798
799 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
800 with_options.filter_pushdown = Some(false);
802 }
803
804 ExplainConfig::try_from(with_options)?
805 };
806 config.verbose_syntax = verbose_syntax;
807
808 let explainee = plan_explainee(scx, explain.explainee, params)?;
809
810 Ok(Plan::ExplainPlan(ExplainPlanPlan {
811 stage,
812 format,
813 config,
814 explainee,
815 }))
816}
817
818pub fn plan_explain_schema(
819 scx: &StatementContext,
820 explain_schema: ExplainSinkSchemaStatement<Aug>,
821) -> Result<Plan, PlanError> {
822 let ExplainSinkSchemaStatement {
823 schema_for,
824 format: _,
826 mut statement,
827 } = explain_schema;
828
829 statement.name = Some(UnresolvedItemName::qualified(&[
833 ident!("mz_catalog"),
834 ident!("mz_explain_schema"),
835 ]));
836
837 crate::pure::purify_create_sink_avro_doc_on_options(
838 scx.catalog,
839 *statement.from.item_id(),
840 &mut statement.format,
841 )?;
842
843 match ddl::plan_create_sink(scx, statement)? {
844 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
845 StorageSinkConnection::Kafka(KafkaSinkConnection {
846 format:
847 KafkaSinkFormat {
848 key_format,
849 value_format:
850 KafkaSinkFormatType::Avro {
851 schema: value_schema,
852 ..
853 },
854 ..
855 },
856 ..
857 }) => {
858 let schema = match schema_for {
859 ExplainSinkSchemaFor::Key => key_format
860 .and_then(|f| match f {
861 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
862 _ => None,
863 })
864 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
865 ExplainSinkSchemaFor::Value => value_schema,
866 };
867
868 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
869 sink_from: sink.from,
870 json_schema: schema,
871 }))
872 }
873 _ => bail_unsupported!(
874 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
875 ),
876 },
877 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
878 }
879}
880
881pub fn plan_explain_pushdown(
882 scx: &StatementContext,
883 statement: ExplainPushdownStatement<Aug>,
884 params: &Params,
885) -> Result<Plan, PlanError> {
886 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
887 let explainee = plan_explainee(scx, statement.explainee, params)?;
888 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
889}
890
891pub fn plan_explain_analyze_object(
892 scx: &StatementContext,
893 statement: ExplainAnalyzeObjectStatement<Aug>,
894 params: &Params,
895) -> Result<Plan, PlanError> {
896 let explainee_name = statement
897 .explainee
898 .name()
899 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
900 .full_name_str();
901 let explainee = plan_explainee(scx, statement.explainee, params)?;
902
903 match explainee {
904 plan::Explainee::Index(_index_id) => (),
905 plan::Explainee::MaterializedView(_item_id) => (),
906 _ => {
907 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
908 }
909 };
910
911 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
926 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
927 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
928 let mut order_by = vec!["mlm.lir_id DESC"];
929
930 match statement.properties {
931 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
932 properties,
933 skew,
934 }) => {
935 let mut worker_id = None;
936 let mut seen_properties = BTreeSet::new();
937 for property in properties {
938 if !seen_properties.insert(property) {
940 continue;
941 }
942
943 match property {
944 ExplainAnalyzeComputationProperty::Memory => {
945 ctes.push((
946 "summary_memory",
947 r#"
948 SELECT mlm.global_id AS global_id,
949 mlm.lir_id AS lir_id,
950 SUM(mas.size) AS total_memory,
951 SUM(mas.records) AS total_records,
952 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
953 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
954 FROM mz_introspection.mz_lir_mapping mlm
955 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id"#,
959 ));
960 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
961
962 if skew {
963 ctes.push((
964 "per_worker_memory",
965 r#"
966 SELECT mlm.global_id AS global_id,
967 mlm.lir_id AS lir_id,
968 mas.worker_id AS worker_id,
969 SUM(mas.size) AS worker_memory,
970 SUM(mas.records) AS worker_records
971 FROM mz_introspection.mz_lir_mapping mlm
972 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
973 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
974 ON (mas.operator_id = valid_id)
975GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
976 ));
977 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
978
979 if let Some(worker_id) = worker_id {
980 predicates.push(format!("pwm.worker_id = {worker_id}"));
981 } else {
982 worker_id = Some("pwm.worker_id");
983 columns.push("pwm.worker_id AS worker_id");
984 order_by.push("worker_id");
985 }
986
987 columns.extend([
988 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
989 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
990 "pg_size_pretty(sm.avg_memory) AS avg_memory",
991 "pg_size_pretty(sm.total_memory) AS total_memory",
992 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
993 "pwm.worker_records AS worker_records",
994 "sm.avg_records AS avg_records",
995 "sm.total_records AS total_records",
996 ]);
997 } else {
998 columns.extend([
999 "pg_size_pretty(sm.total_memory) AS total_memory",
1000 "sm.total_records AS total_records",
1001 ]);
1002 }
1003 }
1004 ExplainAnalyzeComputationProperty::Cpu => {
1005 ctes.push((
1006 "summary_cpu",
1007 r#"
1008 SELECT mlm.global_id AS global_id,
1009 mlm.lir_id AS lir_id,
1010 SUM(mse.elapsed_ns) AS total_ns,
1011 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1012 FROM mz_introspection.mz_lir_mapping mlm
1013 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1014 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1015 ON (mse.id = valid_id)
1016GROUP BY mlm.global_id, mlm.lir_id"#,
1017 ));
1018 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1019
1020 if skew {
1021 ctes.push((
1022 "per_worker_cpu",
1023 r#"
1024 SELECT mlm.global_id AS global_id,
1025 mlm.lir_id AS lir_id,
1026 mse.worker_id AS worker_id,
1027 SUM(mse.elapsed_ns) AS worker_ns
1028 FROM mz_introspection.mz_lir_mapping mlm
1029 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1030 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1031 ON (mse.id = valid_id)
1032GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1033 ));
1034 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1035
1036 if let Some(worker_id) = worker_id {
1037 predicates.push(format!("pwc.worker_id = {worker_id}"));
1038 } else {
1039 worker_id = Some("pwc.worker_id");
1040 columns.push("pwc.worker_id AS worker_id");
1041 order_by.push("worker_id");
1042 }
1043
1044 columns.extend([
1045 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1046 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1047 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1048 ]);
1049 }
1050 columns.push(
1051 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1052 );
1053 }
1054 }
1055 }
1056 }
1057 ExplainAnalyzeProperty::Hints => {
1058 columns.extend([
1059 "megsa.levels AS levels",
1060 "megsa.to_cut AS to_cut",
1061 "megsa.hint AS hint",
1062 "pg_size_pretty(megsa.savings) AS savings",
1063 ]);
1064 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1065 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1066 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1067 }
1068 }
1069
1070 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1071
1072 let ctes = if !ctes.is_empty() {
1073 format!(
1074 "WITH {}",
1075 separated(
1076 ",\n",
1077 ctes.iter()
1078 .map(|(name, defn)| format!("{name} AS ({defn})"))
1079 )
1080 )
1081 } else {
1082 String::new()
1083 };
1084 let columns = separated(", ", columns);
1085 let from = separated(" ", from);
1086 let predicates = separated(" AND ", predicates);
1087 let order_by = separated(", ", order_by);
1088 let query = format!(
1089 r#"{ctes}
1090SELECT {columns}
1091FROM {from}
1092WHERE {predicates}
1093ORDER BY {order_by}"#
1094 );
1095
1096 if statement.as_sql {
1097 let rows = vec![Row::pack_slice(&[Datum::String(
1098 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1099 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1100 })?,
1101 )])];
1102 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1103
1104 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1105 } else {
1106 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1107 show_select.plan()
1108 }
1109}
1110
1111pub fn plan_explain_analyze_cluster(
1112 scx: &StatementContext,
1113 statement: ExplainAnalyzeClusterStatement,
1114 _params: &Params,
1115) -> Result<Plan, PlanError> {
1116 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1141 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1142 let mut predicates = vec![];
1143 let mut order_by = vec![];
1144
1145 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1146 let mut worker_id = None;
1147 let mut seen_properties = BTreeSet::new();
1148 for property in properties {
1149 if !seen_properties.insert(property) {
1151 continue;
1152 }
1153
1154 match property {
1155 ExplainAnalyzeComputationProperty::Memory => {
1156 if skew {
1157 let mut set_worker_id = false;
1158 if let Some(worker_id) = worker_id {
1159 predicates.push(format!("om.worker_id = {worker_id}"));
1161 } else {
1162 worker_id = Some("om.worker_id");
1163 columns.push("om.worker_id AS worker_id");
1164 set_worker_id = true; };
1166
1167 ctes.push((
1169 "per_operator_memory_summary",
1170 r#"
1171SELECT mlm.global_id AS global_id,
1172 mlm.lir_id AS lir_id,
1173 SUM(mas.size) AS total_memory,
1174 SUM(mas.records) AS total_records,
1175 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1176 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1177FROM mz_introspection.mz_lir_mapping mlm
1178 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1179 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1180 ON (mas.operator_id = valid_id)
1181GROUP BY mlm.global_id, mlm.lir_id"#,
1182 ));
1183
1184 ctes.push((
1186 "per_operator_memory_per_worker",
1187 r#"
1188SELECT mlm.global_id AS global_id,
1189 mlm.lir_id AS lir_id,
1190 mas.worker_id AS worker_id,
1191 SUM(mas.size) AS worker_memory,
1192 SUM(mas.records) AS worker_records
1193FROM mz_introspection.mz_lir_mapping mlm
1194 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1195 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1196 ON (mas.operator_id = valid_id)
1197GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1198 ));
1199
1200 ctes.push((
1202 "per_operator_memory_ratios",
1203 r#"
1204SELECT pompw.global_id AS global_id,
1205 pompw.lir_id AS lir_id,
1206 pompw.worker_id AS worker_id,
1207 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1208 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1209 FROM per_operator_memory_per_worker pompw
1210 JOIN per_operator_memory_summary poms
1211 USING (global_id, lir_id)
1212"#,
1213 ));
1214
1215 ctes.push((
1217 "object_memory",
1218 r#"
1219SELECT pompw.global_id AS global_id,
1220 pompw.worker_id AS worker_id,
1221 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1222 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1223 SUM(pompw.worker_memory) AS worker_memory,
1224 SUM(pompw.worker_records) AS worker_records
1225FROM per_operator_memory_per_worker pompw
1226 JOIN per_operator_memory_ratios pomr
1227 USING (global_id, worker_id, lir_id)
1228GROUP BY pompw.global_id, pompw.worker_id
1229"#,
1230 ));
1231
1232 ctes.push(("object_average_memory", r#"
1234SELECT om.global_id AS global_id,
1235 SUM(om.worker_memory) AS total_memory,
1236 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1237 SUM(om.worker_records) AS total_records,
1238 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1239 FROM object_memory om
1240GROUP BY om.global_id"#));
1241
1242 from.push("LEFT JOIN object_memory om USING (global_id)");
1243 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1244
1245 columns.extend([
1246 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1247 "pg_size_pretty(om.worker_memory) AS worker_memory",
1248 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1249 "pg_size_pretty(oam.total_memory) AS total_memory",
1250 "om.max_operator_records_ratio AS max_operator_records_ratio",
1251 "om.worker_records AS worker_records",
1252 "oam.avg_records AS avg_records",
1253 "oam.total_records AS total_records",
1254 ]);
1255
1256 order_by.extend([
1257 "max_operator_memory_ratio DESC",
1258 "max_operator_records_ratio DESC",
1259 "om.worker_memory DESC",
1260 "worker_records DESC",
1261 ]);
1262
1263 if set_worker_id {
1264 order_by.push("worker_id");
1265 }
1266 } else {
1267 ctes.push((
1269 "per_operator_memory_totals",
1270 r#"
1271 SELECT mlm.global_id AS global_id,
1272 mlm.lir_id AS lir_id,
1273 SUM(mas.size) AS total_memory,
1274 SUM(mas.records) AS total_records
1275 FROM mz_introspection.mz_lir_mapping mlm
1276 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1277 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1278 ON (mas.operator_id = valid_id)
1279 GROUP BY mlm.global_id, mlm.lir_id"#,
1280 ));
1281
1282 ctes.push((
1283 "object_memory_totals",
1284 r#"
1285SELECT pomt.global_id AS global_id,
1286 SUM(pomt.total_memory) AS total_memory,
1287 SUM(pomt.total_records) AS total_records
1288FROM per_operator_memory_totals pomt
1289GROUP BY pomt.global_id
1290"#,
1291 ));
1292
1293 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1294 columns.extend([
1295 "pg_size_pretty(omt.total_memory) AS total_memory",
1296 "omt.total_records AS total_records",
1297 ]);
1298 order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1299 }
1300 }
1301 ExplainAnalyzeComputationProperty::Cpu => {
1302 if skew {
1303 let mut set_worker_id = false;
1304 if let Some(worker_id) = worker_id {
1305 predicates.push(format!("oc.worker_id = {worker_id}"));
1307 } else {
1308 worker_id = Some("oc.worker_id");
1309 columns.push("oc.worker_id AS worker_id");
1310 set_worker_id = true; };
1312
1313 ctes.push((
1315 "per_operator_cpu_summary",
1316 r#"
1317SELECT mlm.global_id AS global_id,
1318 mlm.lir_id AS lir_id,
1319 SUM(mse.elapsed_ns) AS total_ns,
1320 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1321FROM mz_introspection.mz_lir_mapping mlm
1322CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1323 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1324 ON (mse.id = valid_id)
1325GROUP BY mlm.global_id, mlm.lir_id"#,
1326));
1327
1328 ctes.push((
1330 "per_operator_cpu_per_worker",
1331 r#"
1332SELECT mlm.global_id AS global_id,
1333 mlm.lir_id AS lir_id,
1334 mse.worker_id AS worker_id,
1335 SUM(mse.elapsed_ns) AS worker_ns
1336FROM mz_introspection.mz_lir_mapping mlm
1337CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1338 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1339 ON (mse.id = valid_id)
1340GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1341 ));
1342
1343 ctes.push((
1345 "per_operator_cpu_ratios",
1346 r#"
1347SELECT pocpw.global_id AS global_id,
1348 pocpw.lir_id AS lir_id,
1349 pocpw.worker_id AS worker_id,
1350 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1351FROM per_operator_cpu_per_worker pocpw
1352 JOIN per_operator_cpu_summary pocs
1353 USING (global_id, lir_id)
1354"#,
1355 ));
1356
1357 ctes.push((
1359 "object_cpu",
1360 r#"
1361SELECT pocpw.global_id AS global_id,
1362 pocpw.worker_id AS worker_id,
1363 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1364 SUM(pocpw.worker_ns) AS worker_ns
1365FROM per_operator_cpu_per_worker pocpw
1366 JOIN per_operator_cpu_ratios pomr
1367 USING (global_id, worker_id, lir_id)
1368GROUP BY pocpw.global_id, pocpw.worker_id
1369"#,
1370 ));
1371
1372 ctes.push((
1374 "object_average_cpu",
1375 r#"
1376SELECT oc.global_id AS global_id,
1377 SUM(oc.worker_ns) AS total_ns,
1378 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1379 FROM object_cpu oc
1380GROUP BY oc.global_id"#,));
1381
1382 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1383 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1384
1385 columns.extend([
1386 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1387 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1388 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1389 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1390 ]);
1391
1392 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1393
1394 if set_worker_id {
1395 order_by.push("worker_id");
1396 }
1397 } else {
1398 ctes.push((
1400 "per_operator_cpu_totals",
1401 r#"
1402 SELECT mlm.global_id AS global_id,
1403 mlm.lir_id AS lir_id,
1404 SUM(mse.elapsed_ns) AS total_ns
1405 FROM mz_introspection.mz_lir_mapping mlm
1406 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1407 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1408 ON (mse.id = valid_id)
1409 GROUP BY mlm.global_id, mlm.lir_id"#,
1410 ));
1411
1412 ctes.push((
1413 "object_cpu_totals",
1414 r#"
1415SELECT poct.global_id AS global_id,
1416 SUM(poct.total_ns) AS total_ns
1417FROM per_operator_cpu_totals poct
1418GROUP BY poct.global_id
1419"#,
1420 ));
1421
1422 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1423 columns
1424 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1425 order_by.extend(["total_elapsed DESC"]);
1426 }
1427 }
1428 }
1429 }
1430
1431 let ctes = if !ctes.is_empty() {
1433 format!(
1434 "WITH {}",
1435 separated(
1436 ",\n",
1437 ctes.iter()
1438 .map(|(name, defn)| format!("{name} AS ({defn})"))
1439 )
1440 )
1441 } else {
1442 String::new()
1443 };
1444 let columns = separated(", ", columns);
1445 let from = separated(" ", from);
1446 let predicates = if !predicates.is_empty() {
1447 format!("WHERE {}", separated(" AND ", predicates))
1448 } else {
1449 String::new()
1450 };
1451 order_by.push("mo.name DESC");
1453 let order_by = separated(", ", order_by);
1454 let query = format!(
1455 r#"{ctes}
1456SELECT {columns}
1457FROM {from}
1458{predicates}
1459ORDER BY {order_by}"#
1460 );
1461
1462 if statement.as_sql {
1463 let rows = vec![Row::pack_slice(&[Datum::String(
1464 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1465 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1466 })?,
1467 )])];
1468 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1469
1470 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1471 } else {
1472 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1473 show_select.plan()
1474 }
1475}
1476
1477pub fn plan_explain_timestamp(
1478 scx: &StatementContext,
1479 explain: ExplainTimestampStatement<Aug>,
1480) -> Result<Plan, PlanError> {
1481 let (format, _verbose_syntax) = match explain.format() {
1482 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1483 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1484 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1485 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1486 };
1487
1488 let raw_plan = {
1489 let query::PlannedRootQuery {
1490 expr: raw_plan,
1491 desc: _,
1492 finishing: _,
1493 scope: _,
1494 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1495 if raw_plan.contains_parameters()? {
1496 return Err(PlanError::ParameterNotAllowed(
1497 "EXPLAIN TIMESTAMP".to_string(),
1498 ));
1499 }
1500
1501 raw_plan
1502 };
1503 let when = query::plan_as_of(scx, explain.select.as_of)?;
1504
1505 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1506 format,
1507 raw_plan,
1508 when,
1509 }))
1510}
1511
1512#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1515pub fn plan_query(
1516 scx: &StatementContext,
1517 query: Query<Aug>,
1518 params: &Params,
1519 lifetime: QueryLifetime,
1520) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1521 let query::PlannedRootQuery {
1522 mut expr,
1523 desc,
1524 finishing,
1525 scope,
1526 } = query::plan_root_query(scx, query, lifetime)?;
1527 expr.bind_parameters(scx, lifetime, params)?;
1528
1529 Ok(query::PlannedRootQuery {
1530 expr: expr.lower(scx.catalog.system_vars(), None)?,
1532 desc,
1533 finishing,
1534 scope,
1535 })
1536}
1537
1538generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1539
1540pub fn describe_subscribe(
1541 scx: &StatementContext,
1542 stmt: SubscribeStatement<Aug>,
1543) -> Result<StatementDesc, PlanError> {
1544 let relation_desc = match stmt.relation {
1545 SubscribeRelation::Name(name) => {
1546 let item = scx.get_item_by_resolved_name(&name)?;
1547 match item.relation_desc() {
1548 Some(desc) => desc.into_owned(),
1549 None => sql_bail!(
1550 "'{}' cannot be subscribed to because it is a {}",
1551 name.full_name_str(),
1552 item.item_type(),
1553 ),
1554 }
1555 }
1556 SubscribeRelation::Query(query) => {
1557 let query::PlannedRootQuery { desc, .. } =
1558 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1559 desc
1560 }
1561 };
1562 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1563 let progress = progress.unwrap_or(false);
1564 let mut desc = RelationDesc::builder().with_column(
1565 "mz_timestamp",
1566 SqlScalarType::Numeric {
1567 max_scale: Some(NumericMaxScale::ZERO),
1568 }
1569 .nullable(false),
1570 );
1571 if progress {
1572 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1573 }
1574
1575 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1576 match stmt.output {
1577 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1578 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1579 for (name, mut ty) in relation_desc.into_iter() {
1580 if progress {
1581 ty.nullable = true;
1582 }
1583 desc = desc.with_column(name, ty);
1584 }
1585 }
1586 SubscribeOutput::EnvelopeUpsert { key_columns }
1587 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1588 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1589 let key_columns = key_columns
1590 .into_iter()
1591 .map(normalize::column_name)
1592 .collect_vec();
1593 let mut before_values_desc = RelationDesc::builder();
1594 let mut after_values_desc = RelationDesc::builder();
1595
1596 for column_name in &key_columns {
1598 let mut column_ty = relation_desc
1599 .get_by_name(column_name)
1600 .map(|(_pos, ty)| ty.clone())
1601 .ok_or_else(|| PlanError::UnknownColumn {
1602 table: None,
1603 column: column_name.clone(),
1604 similar: Box::new([]),
1605 })?;
1606 if progress {
1607 column_ty.nullable = true;
1608 }
1609 desc = desc.with_column(column_name, column_ty);
1610 }
1611
1612 for (mut name, mut ty) in relation_desc
1615 .into_iter()
1616 .filter(|(name, _ty)| !key_columns.contains(name))
1617 {
1618 ty.nullable = true;
1619 before_values_desc =
1620 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1621 if debezium {
1622 name = format!("after_{}", name).into();
1623 }
1624 after_values_desc = after_values_desc.with_column(name, ty);
1625 }
1626
1627 if debezium {
1628 desc = desc.concat(before_values_desc);
1629 }
1630 desc = desc.concat(after_values_desc);
1631 }
1632 }
1633 Ok(StatementDesc::new(Some(desc.finish())))
1634}
1635
1636pub fn plan_subscribe(
1637 scx: &StatementContext,
1638 SubscribeStatement {
1639 relation,
1640 options,
1641 as_of,
1642 up_to,
1643 output,
1644 }: SubscribeStatement<Aug>,
1645 params: &Params,
1646 copy_to: Option<CopyFormat>,
1647) -> Result<Plan, PlanError> {
1648 let (from, desc, scope) = match relation {
1649 SubscribeRelation::Name(name) => {
1650 let item = scx.get_item_by_resolved_name(&name)?;
1651 let Some(desc) = item.relation_desc() else {
1652 sql_bail!(
1653 "'{}' cannot be subscribed to because it is a {}",
1654 name.full_name_str(),
1655 item.item_type(),
1656 );
1657 };
1658 let item_name = match name {
1659 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1660 _ => None,
1661 };
1662 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1663 (
1664 SubscribeFrom::Id(item.global_id()),
1665 desc.into_owned(),
1666 scope,
1667 )
1668 }
1669 SubscribeRelation::Query(query) => {
1670 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1672 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1676 &query.finishing,
1677 query.desc.arity()
1678 ));
1679 let desc = query.desc.clone();
1680 (
1681 SubscribeFrom::Query {
1682 expr: query.expr,
1683 desc: query.desc,
1684 },
1685 desc,
1686 query.scope,
1687 )
1688 }
1689 };
1690
1691 let when = query::plan_as_of(scx, as_of)?;
1692 let up_to = up_to
1693 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1694 .transpose()?;
1695
1696 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1697 let ecx = ExprContext {
1698 qcx: &qcx,
1699 name: "",
1700 scope: &scope,
1701 relation_type: desc.typ(),
1702 allow_aggregates: false,
1703 allow_subqueries: true,
1704 allow_parameters: true,
1705 allow_windows: false,
1706 };
1707
1708 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1709 let output = match output {
1710 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1711 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1712 let order_by = key_columns
1713 .iter()
1714 .map(|ident| OrderByExpr {
1715 expr: Expr::Identifier(vec![ident.clone()]),
1716 asc: None,
1717 nulls_last: None,
1718 })
1719 .collect_vec();
1720 let (order_by, map_exprs) = query::plan_order_by_exprs(
1721 &ExprContext {
1722 name: "ENVELOPE UPSERT KEY clause",
1723 ..ecx
1724 },
1725 &order_by[..],
1726 &output_columns[..],
1727 )?;
1728 if !map_exprs.is_empty() {
1729 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1730 }
1731 plan::SubscribeOutput::EnvelopeUpsert {
1732 order_by_keys: order_by,
1733 }
1734 }
1735 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1736 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1737 let order_by = key_columns
1738 .iter()
1739 .map(|ident| OrderByExpr {
1740 expr: Expr::Identifier(vec![ident.clone()]),
1741 asc: None,
1742 nulls_last: None,
1743 })
1744 .collect_vec();
1745 let (order_by, map_exprs) = query::plan_order_by_exprs(
1746 &ExprContext {
1747 name: "ENVELOPE DEBEZIUM KEY clause",
1748 ..ecx
1749 },
1750 &order_by[..],
1751 &output_columns[..],
1752 )?;
1753 if !map_exprs.is_empty() {
1754 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1755 }
1756 plan::SubscribeOutput::EnvelopeDebezium {
1757 order_by_keys: order_by,
1758 }
1759 }
1760 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1761 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1762 let mz_diff = "mz_diff".into();
1763 let output_columns = std::iter::once((0, &mz_diff))
1764 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1765 .collect_vec();
1766 match query::plan_order_by_exprs(
1767 &ExprContext {
1768 name: "WITHIN TIMESTAMP ORDER BY clause",
1769 ..ecx
1770 },
1771 &order_by[..],
1772 &output_columns[..],
1773 ) {
1774 Err(PlanError::UnknownColumn {
1775 table: None,
1776 column,
1777 similar: _,
1778 }) if &column == &mz_diff => {
1779 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1782 }
1783 Err(e) => return Err(e),
1784 Ok((order_by, map_exprs)) => {
1785 if !map_exprs.is_empty() {
1786 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1787 }
1788
1789 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1790 }
1791 }
1792 }
1793 };
1794
1795 let SubscribeOptionExtracted {
1796 progress, snapshot, ..
1797 } = options.try_into()?;
1798 Ok(Plan::Subscribe(SubscribePlan {
1799 from,
1800 when,
1801 up_to,
1802 with_snapshot: snapshot.unwrap_or(true),
1803 copy_to,
1804 emit_progress: progress.unwrap_or(false),
1805 output,
1806 }))
1807}
1808
1809pub fn describe_copy_from_table(
1810 scx: &StatementContext,
1811 table_name: <Aug as AstInfo>::ItemName,
1812 columns: Vec<Ident>,
1813) -> Result<StatementDesc, PlanError> {
1814 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1815 Ok(StatementDesc::new(Some(desc)))
1816}
1817
1818pub fn describe_copy_item(
1819 scx: &StatementContext,
1820 object_name: <Aug as AstInfo>::ItemName,
1821 columns: Vec<Ident>,
1822) -> Result<StatementDesc, PlanError> {
1823 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1824 Ok(StatementDesc::new(Some(desc)))
1825}
1826
1827pub fn describe_copy(
1828 scx: &StatementContext,
1829 CopyStatement {
1830 relation,
1831 direction,
1832 ..
1833 }: CopyStatement<Aug>,
1834) -> Result<StatementDesc, PlanError> {
1835 Ok(match (relation, direction) {
1836 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1837 describe_copy_item(scx, name, columns)?
1838 }
1839 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1840 describe_copy_from_table(scx, name, columns)?
1841 }
1842 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1843 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1844 }
1845 .with_is_copy())
1846}
1847
1848fn plan_copy_to_expr(
1849 scx: &StatementContext,
1850 select_plan: SelectPlan,
1851 desc: RelationDesc,
1852 to: &Expr<Aug>,
1853 format: CopyFormat,
1854 options: CopyOptionExtracted,
1855) -> Result<Plan, PlanError> {
1856 let conn_id = match options.aws_connection {
1857 Some(conn_id) => CatalogItemId::from(conn_id),
1858 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1859 };
1860 let connection = scx.get_item(&conn_id).connection()?;
1861
1862 match connection {
1863 mz_storage_types::connections::Connection::Aws(_) => {}
1864 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1865 }
1866
1867 let format = match format {
1868 CopyFormat::Csv => {
1869 let quote = extract_byte_param_value(options.quote, "quote")?;
1870 let escape = extract_byte_param_value(options.escape, "escape")?;
1871 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1872 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1873 CopyCsvFormatParams::try_new(
1874 delimiter,
1875 quote,
1876 escape,
1877 options.header,
1878 options.null,
1879 )
1880 .map_err(|e| sql_err!("{}", e))?,
1881 ))
1882 }
1883 CopyFormat::Parquet => {
1884 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1886 S3SinkFormat::Parquet
1887 }
1888 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1889 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1890 };
1891
1892 let mut to_expr = to.clone();
1894 transform_ast::transform(scx, &mut to_expr)?;
1895 let relation_type = RelationDesc::empty();
1896 let ecx = &ExprContext {
1897 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1898 name: "COPY TO target",
1899 scope: &Scope::empty(),
1900 relation_type: relation_type.typ(),
1901 allow_aggregates: false,
1902 allow_subqueries: false,
1903 allow_parameters: false,
1904 allow_windows: false,
1905 };
1906
1907 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1908
1909 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1910 sql_bail!(
1911 "MAX FILE SIZE cannot be less than {}",
1912 MIN_S3_SINK_FILE_SIZE
1913 );
1914 }
1915 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1916 sql_bail!(
1917 "MAX FILE SIZE cannot be greater than {}",
1918 MAX_S3_SINK_FILE_SIZE
1919 );
1920 }
1921
1922 Ok(Plan::CopyTo(CopyToPlan {
1923 select_plan,
1924 desc,
1925 to,
1926 connection: connection.to_owned(),
1927 connection_id: conn_id,
1928 format,
1929 max_file_size: options.max_file_size.as_bytes(),
1930 }))
1931}
1932
1933fn plan_copy_from(
1934 scx: &StatementContext,
1935 target: &CopyTarget<Aug>,
1936 table_name: ResolvedItemName,
1937 columns: Vec<Ident>,
1938 format: CopyFormat,
1939 options: CopyOptionExtracted,
1940) -> Result<Plan, PlanError> {
1941 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1942 match option {
1943 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1944 None => Ok(()),
1945 }
1946 }
1947
1948 let source = match target {
1949 CopyTarget::Stdin => CopyFromSource::Stdin,
1950 CopyTarget::Expr(from) => {
1951 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1952
1953 let mut from_expr = from.clone();
1955 transform_ast::transform(scx, &mut from_expr)?;
1956 let relation_type = RelationDesc::empty();
1957 let ecx = &ExprContext {
1958 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1959 name: "COPY FROM target",
1960 scope: &Scope::empty(),
1961 relation_type: relation_type.typ(),
1962 allow_aggregates: false,
1963 allow_subqueries: false,
1964 allow_parameters: false,
1965 allow_windows: false,
1966 };
1967 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1968
1969 match options.aws_connection {
1970 Some(conn_id) => {
1971 let conn_id = CatalogItemId::from(conn_id);
1972
1973 let connection = match scx.get_item(&conn_id).connection()? {
1975 mz_storage_types::connections::Connection::Aws(conn) => conn,
1976 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1977 };
1978
1979 CopyFromSource::AwsS3 {
1980 uri: from,
1981 connection,
1982 connection_id: conn_id,
1983 }
1984 }
1985 None => CopyFromSource::Url(from),
1986 }
1987 }
1988 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1989 };
1990
1991 let params = match format {
1992 CopyFormat::Text => {
1993 only_available_with_csv(options.quote, "quote")?;
1994 only_available_with_csv(options.escape, "escape")?;
1995 only_available_with_csv(options.header, "HEADER")?;
1996 let delimiter =
1997 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1998 let null = match options.null {
1999 Some(null) => Cow::from(null),
2000 None => Cow::from("\\N"),
2001 };
2002 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
2003 }
2004 CopyFormat::Csv => {
2005 let quote = extract_byte_param_value(options.quote, "quote")?;
2006 let escape = extract_byte_param_value(options.escape, "escape")?;
2007 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2008 CopyFormatParams::Csv(
2009 CopyCsvFormatParams::try_new(
2010 delimiter,
2011 quote,
2012 escape,
2013 options.header,
2014 options.null,
2015 )
2016 .map_err(|e| sql_err!("{}", e))?,
2017 )
2018 }
2019 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2020 CopyFormat::Parquet => CopyFormatParams::Parquet,
2021 };
2022
2023 let filter = match (options.files, options.pattern) {
2024 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2025 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2026 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2027 (None, None) => None,
2028 };
2029
2030 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2031 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2032 }
2033
2034 let table_name_string = table_name.full_name_str();
2035
2036 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2037
2038 let Some(mfp) = maybe_mfp else {
2039 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2040 };
2041
2042 Ok(Plan::CopyFrom(CopyFromPlan {
2043 target_id: id,
2044 target_name: table_name_string,
2045 source,
2046 columns,
2047 source_desc,
2048 mfp,
2049 params,
2050 filter,
2051 }))
2052}
2053
2054fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2055 match v {
2056 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2057 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2058 None => Ok(None),
2059 }
2060}
2061
2062generate_extracted_config!(
2063 CopyOption,
2064 (Format, String),
2065 (Delimiter, String),
2066 (Null, String),
2067 (Escape, String),
2068 (Quote, String),
2069 (Header, bool),
2070 (AwsConnection, with_options::Object),
2071 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2072 (Files, Vec<String>),
2073 (Pattern, String)
2074);
2075
2076pub fn plan_copy(
2077 scx: &StatementContext,
2078 CopyStatement {
2079 relation,
2080 direction,
2081 target,
2082 options,
2083 }: CopyStatement<Aug>,
2084) -> Result<Plan, PlanError> {
2085 let options = CopyOptionExtracted::try_from(options)?;
2086 let format = options
2089 .format
2090 .as_ref()
2091 .map(|format| match format.to_lowercase().as_str() {
2092 "text" => Ok(CopyFormat::Text),
2093 "csv" => Ok(CopyFormat::Csv),
2094 "binary" => Ok(CopyFormat::Binary),
2095 "parquet" => Ok(CopyFormat::Parquet),
2096 _ => sql_bail!("unknown FORMAT: {}", format),
2097 })
2098 .transpose()?;
2099
2100 match (&direction, &target) {
2101 (CopyDirection::To, CopyTarget::Stdout) => {
2102 if options.delimiter.is_some() {
2103 sql_bail!("COPY TO does not support DELIMITER option yet");
2104 }
2105 if options.quote.is_some() {
2106 sql_bail!("COPY TO does not support QUOTE option yet");
2107 }
2108 if options.null.is_some() {
2109 sql_bail!("COPY TO does not support NULL option yet");
2110 }
2111 match relation {
2112 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2113 CopyRelation::Select(stmt) => Ok(plan_select(
2114 scx,
2115 stmt,
2116 &Params::empty(),
2117 Some(format.unwrap_or(CopyFormat::Text)),
2118 )?),
2119 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2120 scx,
2121 stmt,
2122 &Params::empty(),
2123 Some(format.unwrap_or(CopyFormat::Text)),
2124 )?),
2125 }
2126 }
2127 (CopyDirection::From, target) => match relation {
2128 CopyRelation::Named { name, columns } => plan_copy_from(
2129 scx,
2130 target,
2131 name,
2132 columns,
2133 format.unwrap_or(CopyFormat::Text),
2134 options,
2135 ),
2136 _ => sql_bail!("COPY FROM {} not supported", target),
2137 },
2138 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2139 if !scx.catalog.active_role_id().is_system() {
2144 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2145 }
2146
2147 let format = match format {
2148 Some(inner) => inner,
2149 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2150 };
2151
2152 let stmt = match relation {
2153 CopyRelation::Named { name, columns } => {
2154 if !columns.is_empty() {
2155 sql_bail!(
2157 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2158 );
2159 }
2160 let query = Query {
2162 ctes: CteBlock::empty(),
2163 body: SetExpr::Table(name),
2164 order_by: vec![],
2165 limit: None,
2166 offset: None,
2167 };
2168 SelectStatement { query, as_of: None }
2169 }
2170 CopyRelation::Select(stmt) => {
2171 if !stmt.query.order_by.is_empty() {
2172 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2173 }
2174 stmt
2175 }
2176 _ => sql_bail!("COPY {} {} not supported", direction, target),
2177 };
2178
2179 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2180 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2181 }
2182 _ => sql_bail!("COPY {} {} not supported", direction, target),
2183 }
2184}