1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37 SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50 UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72pub fn describe_insert(
79 scx: &StatementContext,
80 InsertStatement {
81 table_name,
82 columns,
83 source,
84 returning,
85 }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88 let desc = if returning.expr.is_empty() {
89 None
90 } else {
91 Some(returning.desc)
92 };
93 Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97 scx: &StatementContext,
98 InsertStatement {
99 table_name,
100 columns,
101 source,
102 returning,
103 }: InsertStatement<Aug>,
104 params: &Params,
105) -> Result<Plan, PlanError> {
106 let (id, mut expr, returning) =
107 query::plan_insert_query(scx, table_name, columns, source, returning)?;
108 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109 let returning = returning
110 .expr
111 .into_iter()
112 .map(|mut expr| {
113 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114 expr.lower_uncorrelated(scx.catalog.system_vars())
115 })
116 .collect::<Result<Vec<_>, _>>()?;
117
118 Ok(Plan::Insert(InsertPlan {
119 id,
120 values: expr,
121 returning,
122 }))
123}
124
125pub fn describe_delete(
126 scx: &StatementContext,
127 stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129 query::plan_delete_query(scx, stmt)?;
130 Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134 scx: &StatementContext,
135 stmt: DeleteStatement<Aug>,
136 params: &Params,
137) -> Result<Plan, PlanError> {
138 let rtw_plan = query::plan_delete_query(scx, stmt)?;
139 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143 scx: &StatementContext,
144 stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146 query::plan_update_query(scx, stmt)?;
147 Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151 scx: &StatementContext,
152 stmt: UpdateStatement<Aug>,
153 params: &Params,
154) -> Result<Plan, PlanError> {
155 let rtw_plan = query::plan_update_query(scx, stmt)?;
156 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160 scx: &StatementContext,
161 kind: MutationKind,
162 params: &Params,
163 query::ReadThenWritePlan {
164 id,
165 mut selection,
166 finishing,
167 assignments,
168 }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171 let mut assignments_outer = BTreeMap::new();
172 for (idx, mut set) in assignments {
173 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
175 assignments_outer.insert(idx, set);
176 }
177
178 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179 id,
180 selection,
181 finishing,
182 assignments: assignments_outer,
183 kind,
184 returning: Vec::new(),
185 }))
186}
187
188pub fn describe_select(
189 scx: &StatementContext,
190 stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193 return Ok(StatementDesc::new(Some(desc)));
194 }
195
196 let query::PlannedRootQuery { desc, .. } =
197 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198 Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202 scx: &StatementContext,
203 select: SelectStatement<Aug>,
204 params: &Params,
205 copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208 return Ok(Plan::SideEffectingFunc(f));
209 }
210
211 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212 Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216 scx: &StatementContext,
217 select: SelectStatement<Aug>,
218 params: &Params,
219 copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221 let when = query::plan_as_of(scx, select.as_of.clone())?;
222 let lifetime = QueryLifetime::OneShot;
223 let query::PlannedRootQuery {
224 mut expr,
225 desc,
226 finishing,
227 scope: _,
228 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229 expr.bind_parameters(scx, lifetime, params)?;
230
231 expr.try_visit_mut_pre(&mut |expr| {
234 if let HirRelationExpr::TopK { offset, .. } = expr {
235 let offset_value = offset_into_value(offset.take())?;
236 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237 }
238 Ok::<(), PlanError>(())
239 })?;
240 let limit = match finishing.limit {
250 None => None,
251 Some(mut limit) => {
252 limit.bind_parameters(scx, lifetime, params)?;
253 let Some(limit) = limit.as_literal() else {
255 sql_bail!(
256 "Top-level LIMIT must be a constant expression, got {}",
257 limit
258 )
259 };
260 match limit {
261 Datum::Null => None,
262 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263 _ => {
264 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265 sql_bail!("LIMIT must be a non-negative INT or NULL")
266 }
267 }
268 }
269 };
270 let offset = {
271 let mut offset = finishing.offset.clone();
272 offset.bind_parameters(scx, lifetime, params)?;
273 let offset = offset_into_value(offset.take())?;
274 offset
275 .try_into()
276 .expect("checked in offset_into_value that it is not negative")
277 };
278
279 let plan = SelectPlan {
280 source: expr,
281 when,
282 finishing: RowSetFinishing {
283 limit,
284 offset,
285 project: finishing.project,
286 order_by: finishing.order_by,
287 },
288 copy_to,
289 select: Some(Box::new(select)),
290 };
291
292 Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296 scx: &StatementContext,
297 explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299 let mut relation_desc = RelationDesc::builder();
300
301 match explain.stage() {
302 ExplainStage::RawPlan => {
303 let name = "Raw Plan";
304 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305 }
306 ExplainStage::DecorrelatedPlan => {
307 let name = "Decorrelated Plan";
308 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309 }
310 ExplainStage::LocalPlan => {
311 let name = "Locally Optimized Plan";
312 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313 }
314 ExplainStage::GlobalPlan => {
315 let name = "Optimized Plan";
316 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317 }
318 ExplainStage::PhysicalPlan => {
319 let name = "Physical Plan";
320 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321 }
322 ExplainStage::Trace => {
323 relation_desc = relation_desc
324 .with_column("Time", SqlScalarType::UInt64.nullable(false))
325 .with_column("Path", SqlScalarType::String.nullable(false))
326 .with_column("Plan", SqlScalarType::String.nullable(false));
327 }
328 ExplainStage::PlanInsights => {
329 let name = "Plan Insights";
330 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331 }
332 };
333 let relation_desc = relation_desc.finish();
334
335 Ok(
336 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338 _ => vec![],
339 }),
340 )
341}
342
343pub fn describe_explain_pushdown(
344 scx: &StatementContext,
345 statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347 let relation_desc = RelationDesc::builder()
348 .with_column("Source", SqlScalarType::String.nullable(false))
349 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353 .finish();
354
355 Ok(
356 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358 _ => vec![],
359 }),
360 )
361}
362
363pub fn describe_explain_analyze_object(
364 _scx: &StatementContext,
365 statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367 if statement.as_sql {
368 let relation_desc = RelationDesc::builder()
369 .with_column("SQL", SqlScalarType::String.nullable(false))
370 .finish();
371 return Ok(StatementDesc::new(Some(relation_desc)));
372 }
373
374 match statement.properties {
375 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376 properties,
377 skew,
378 }) => {
379 let mut relation_desc = RelationDesc::builder()
380 .with_column("operator", SqlScalarType::String.nullable(false));
381
382 if skew {
383 relation_desc =
384 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385 }
386
387 let mut seen_properties = BTreeSet::new();
388 for property in properties {
389 if !seen_properties.insert(property) {
391 continue;
392 }
393
394 match property {
395 ExplainAnalyzeComputationProperty::Memory if skew => {
396 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397 relation_desc = relation_desc
398 .with_column("memory_ratio", numeric.clone())
399 .with_column("worker_memory", SqlScalarType::String.nullable(true))
400 .with_column("avg_memory", SqlScalarType::String.nullable(true))
401 .with_column("total_memory", SqlScalarType::String.nullable(true))
402 .with_column("records_ratio", numeric.clone())
403 .with_column("worker_records", numeric.clone())
404 .with_column("avg_records", numeric.clone())
405 .with_column("total_records", numeric);
406 }
407 ExplainAnalyzeComputationProperty::Memory => {
408 relation_desc = relation_desc
409 .with_column("total_memory", SqlScalarType::String.nullable(true))
410 .with_column(
411 "total_records",
412 SqlScalarType::Numeric { max_scale: None }.nullable(true),
413 );
414 }
415 ExplainAnalyzeComputationProperty::Cpu => {
416 if skew {
417 relation_desc = relation_desc
418 .with_column(
419 "cpu_ratio",
420 SqlScalarType::Numeric { max_scale: None }.nullable(true),
421 )
422 .with_column(
423 "worker_elapsed",
424 SqlScalarType::Interval.nullable(true),
425 )
426 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427 }
428 relation_desc = relation_desc
429 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430 }
431 }
432 }
433
434 let relation_desc = relation_desc.finish();
435 Ok(StatementDesc::new(Some(relation_desc)))
436 }
437 ExplainAnalyzeProperty::Hints => {
438 let relation_desc = RelationDesc::builder()
439 .with_column("operator", SqlScalarType::String.nullable(true))
440 .with_column("levels", SqlScalarType::Int64.nullable(true))
441 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442 .with_column("hint", SqlScalarType::Float64.nullable(true))
443 .with_column("savings", SqlScalarType::String.nullable(true))
444 .finish();
445 Ok(StatementDesc::new(Some(relation_desc)))
446 }
447 }
448}
449
450pub fn describe_explain_analyze_cluster(
451 _scx: &StatementContext,
452 statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454 if statement.as_sql {
455 let relation_desc = RelationDesc::builder()
456 .with_column("SQL", SqlScalarType::String.nullable(false))
457 .finish();
458 return Ok(StatementDesc::new(Some(relation_desc)));
459 }
460
461 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463 let mut relation_desc = RelationDesc::builder()
464 .with_column("object", SqlScalarType::String.nullable(false))
465 .with_column("global_id", SqlScalarType::String.nullable(false));
466
467 if skew {
468 relation_desc =
469 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470 }
471
472 let mut seen_properties = BTreeSet::new();
473 for property in properties {
474 if !seen_properties.insert(property) {
476 continue;
477 }
478
479 match property {
480 ExplainAnalyzeComputationProperty::Memory if skew => {
481 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482 relation_desc = relation_desc
483 .with_column("max_operator_memory_ratio", numeric.clone())
484 .with_column("worker_memory", SqlScalarType::String.nullable(true))
485 .with_column("avg_memory", SqlScalarType::String.nullable(true))
486 .with_column("total_memory", SqlScalarType::String.nullable(true))
487 .with_column("max_operator_records_ratio", numeric.clone())
488 .with_column("worker_records", numeric.clone())
489 .with_column("avg_records", numeric.clone())
490 .with_column("total_records", numeric);
491 }
492 ExplainAnalyzeComputationProperty::Memory => {
493 relation_desc = relation_desc
494 .with_column("total_memory", SqlScalarType::String.nullable(true))
495 .with_column(
496 "total_records",
497 SqlScalarType::Numeric { max_scale: None }.nullable(true),
498 );
499 }
500 ExplainAnalyzeComputationProperty::Cpu if skew => {
501 relation_desc = relation_desc
502 .with_column(
503 "max_operator_cpu_ratio",
504 SqlScalarType::Numeric { max_scale: None }.nullable(true),
505 )
506 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509 }
510 ExplainAnalyzeComputationProperty::Cpu => {
511 relation_desc = relation_desc
512 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513 }
514 }
515 }
516
517 Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521 scx: &StatementContext,
522 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 let relation_desc = RelationDesc::builder()
525 .with_column("Timestamp", SqlScalarType::String.nullable(false))
526 .finish();
527
528 Ok(StatementDesc::new(Some(relation_desc))
529 .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533 _: &StatementContext,
534 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536 let relation_desc = RelationDesc::builder()
537 .with_column("Schema", SqlScalarType::String.nullable(false))
538 .finish();
539 Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542generate_extracted_config!(
551 ExplainPlanOption,
552 (Arity, Option<bool>, Default(None)),
553 (Cardinality, bool, Default(false)),
554 (ColumnNames, bool, Default(false)),
555 (FilterPushdown, Option<bool>, Default(None)),
556 (HumanizedExpressions, Option<bool>, Default(None)),
557 (JoinImplementations, bool, Default(false)),
558 (Keys, bool, Default(false)),
559 (LinearChains, bool, Default(false)),
560 (NoFastPath, bool, Default(false)),
561 (NonNegative, bool, Default(false)),
562 (NoNotices, bool, Default(false)),
563 (NodeIdentifiers, bool, Default(false)),
564 (Raw, bool, Default(false)),
565 (RawPlans, bool, Default(false)),
566 (RawSyntax, bool, Default(false)),
567 (Redacted, bool, Default(false)),
568 (SubtreeSize, bool, Default(false)),
569 (Timing, bool, Default(false)),
570 (Types, bool, Default(false)),
571 (Equivalences, bool, Default(false)),
572 (ReoptimizeImportedViews, Option<bool>, Default(None)),
573 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578 (
579 EnableProjectionPushdownAfterRelationCse,
580 Option<bool>,
581 Default(None)
582 )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586 type Error = PlanError;
587
588 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589 if v.raw {
592 v.raw_plans = true;
593 v.raw_syntax = true;
594 }
595
596 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600 Ok(ExplainConfig {
601 arity: v.arity.unwrap_or(enable_on_prod),
602 cardinality: v.cardinality,
603 column_names: v.column_names,
604 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606 join_impls: v.join_implementations,
607 keys: v.keys,
608 linear_chains: !v.raw_plans && v.linear_chains,
609 no_fast_path: v.no_fast_path,
610 no_notices: v.no_notices,
611 node_ids: v.node_identifiers,
612 non_negative: v.non_negative,
613 raw_plans: v.raw_plans,
614 raw_syntax: v.raw_syntax,
615 verbose_syntax: false,
616 redacted: v.redacted,
617 subtree_size: v.subtree_size,
618 equivalences: v.equivalences,
619 timing: v.timing,
620 types: v.types,
621 features: OptimizerFeatureOverrides {
623 enable_guard_subquery_tablefunc: Default::default(),
624 enable_eager_delta_joins: v.enable_eager_delta_joins,
625 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628 enable_consolidate_after_union_negate: Default::default(),
629 enable_reduce_mfp_fusion: Default::default(),
630 enable_cardinality_estimates: Default::default(),
631 persist_fast_path_limit: Default::default(),
632 reoptimize_imported_views: v.reoptimize_imported_views,
633 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
634 enable_projection_pushdown_after_relation_cse: v
635 .enable_projection_pushdown_after_relation_cse,
636 enable_less_reduce_in_eqprop: Default::default(),
637 enable_dequadratic_eqprop_map: Default::default(),
638 enable_eq_classes_withholding_errors: Default::default(),
639 enable_fast_path_plan_insights: Default::default(),
640 enable_cast_elimination: Default::default(),
641 },
642 })
643 }
644}
645
646fn plan_explainee(
647 scx: &StatementContext,
648 explainee: Explainee<Aug>,
649 params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651 use crate::plan::ExplaineeStatement;
652
653 let is_replan = matches!(
654 explainee,
655 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656 );
657
658 let explainee = match explainee {
659 Explainee::View(name) | Explainee::ReplanView(name) => {
660 let item = scx.get_item_by_resolved_name(&name)?;
661 let item_type = item.item_type();
662 if item_type != CatalogItemType::View {
663 sql_bail!("Expected {name} to be a view, not a {item_type}");
664 }
665 match is_replan {
666 true => crate::plan::Explainee::ReplanView(item.id()),
667 false => crate::plan::Explainee::View(item.id()),
668 }
669 }
670 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671 let item = scx.get_item_by_resolved_name(&name)?;
672 let item_type = item.item_type();
673 if item_type != CatalogItemType::MaterializedView {
674 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675 }
676 match is_replan {
677 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678 false => crate::plan::Explainee::MaterializedView(item.id()),
679 }
680 }
681 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682 let item = scx.get_item_by_resolved_name(&name)?;
683 let item_type = item.item_type();
684 if item_type != CatalogItemType::Index {
685 sql_bail!("Expected {name} to be an index, not a {item_type}");
686 }
687 match is_replan {
688 true => crate::plan::Explainee::ReplanIndex(item.id()),
689 false => crate::plan::Explainee::Index(item.id()),
690 }
691 }
692 Explainee::Select(select, broken) => {
693 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695 }
696 Explainee::CreateView(mut stmt, broken) => {
697 if stmt.if_exists != IfExistsBehavior::Skip {
698 stmt.if_exists = IfExistsBehavior::Skip;
703 } else {
704 sql_bail!(
705 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706 (the behavior is implied within the scope of an enclosing EXPLAIN)"
707 );
708 }
709
710 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711 sql_bail!("expected CreateViewPlan plan");
712 };
713
714 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715 }
716 Explainee::CreateMaterializedView(mut stmt, broken) => {
717 if stmt.if_exists != IfExistsBehavior::Skip {
718 stmt.if_exists = IfExistsBehavior::Skip;
723 } else {
724 sql_bail!(
725 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726 (the behavior is implied within the scope of an enclosing EXPLAIN)"
727 );
728 }
729
730 let Plan::CreateMaterializedView(plan) =
731 ddl::plan_create_materialized_view(scx, *stmt)?
732 else {
733 sql_bail!("expected CreateMaterializedViewPlan plan");
734 };
735
736 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737 broken,
738 plan,
739 })
740 }
741 Explainee::CreateIndex(mut stmt, broken) => {
742 if !stmt.if_not_exists {
743 stmt.if_not_exists = true;
746 } else {
747 sql_bail!(
748 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749 (the behavior is implied within the scope of an enclosing EXPLAIN)"
750 );
751 }
752
753 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754 sql_bail!("expected CreateIndexPlan plan");
755 };
756
757 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758 }
759 Explainee::Subscribe(stmt, broken) => {
760 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761 sql_bail!("expected SubscribePlan");
762 };
763 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764 }
765 };
766
767 Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771 scx: &StatementContext,
772 explain: ExplainPlanStatement<Aug>,
773 params: &Params,
774) -> Result<Plan, PlanError> {
775 let (format, verbose_syntax) = match explain.format() {
776 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780 };
781 let stage = explain.stage();
782
783 let mut config = {
785 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788 with_options.filter_pushdown = Some(false);
790 }
791
792 ExplainConfig::try_from(with_options)?
793 };
794 config.verbose_syntax = verbose_syntax;
795
796 let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798 Ok(Plan::ExplainPlan(ExplainPlanPlan {
799 stage,
800 format,
801 config,
802 explainee,
803 }))
804}
805
806pub fn plan_explain_schema(
807 scx: &StatementContext,
808 explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810 let ExplainSinkSchemaStatement {
811 schema_for,
812 format: _,
814 mut statement,
815 } = explain_schema;
816
817 statement.name = Some(UnresolvedItemName::qualified(&[
821 ident!("mz_catalog"),
822 ident!("mz_explain_schema"),
823 ]));
824
825 crate::pure::purify_create_sink_avro_doc_on_options(
826 scx.catalog,
827 *statement.from.item_id(),
828 &mut statement.format,
829 )?;
830
831 match ddl::plan_create_sink(scx, statement)? {
832 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833 StorageSinkConnection::Kafka(KafkaSinkConnection {
834 format:
835 KafkaSinkFormat {
836 key_format,
837 value_format:
838 KafkaSinkFormatType::Avro {
839 schema: value_schema,
840 ..
841 },
842 ..
843 },
844 ..
845 }) => {
846 let schema = match schema_for {
847 ExplainSinkSchemaFor::Key => key_format
848 .and_then(|f| match f {
849 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850 _ => None,
851 })
852 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853 ExplainSinkSchemaFor::Value => value_schema,
854 };
855
856 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857 sink_from: sink.from,
858 json_schema: schema,
859 }))
860 }
861 _ => bail_unsupported!(
862 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863 ),
864 },
865 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
866 }
867}
868
869pub fn plan_explain_pushdown(
870 scx: &StatementContext,
871 statement: ExplainPushdownStatement<Aug>,
872 params: &Params,
873) -> Result<Plan, PlanError> {
874 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875 let explainee = plan_explainee(scx, statement.explainee, params)?;
876 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880 scx: &StatementContext,
881 statement: ExplainAnalyzeObjectStatement<Aug>,
882 params: &Params,
883) -> Result<Plan, PlanError> {
884 let explainee_name = statement
885 .explainee
886 .name()
887 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888 .full_name_str();
889 let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891 match explainee {
892 plan::Explainee::Index(_index_id) => (),
893 plan::Explainee::MaterializedView(_item_id) => (),
894 _ => {
895 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
896 }
897 };
898
899 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
914 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
915 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
916 let mut order_by = vec!["mlm.lir_id DESC"];
917
918 match statement.properties {
919 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
920 properties,
921 skew,
922 }) => {
923 let mut worker_id = None;
924 let mut seen_properties = BTreeSet::new();
925 for property in properties {
926 if !seen_properties.insert(property) {
928 continue;
929 }
930
931 match property {
932 ExplainAnalyzeComputationProperty::Memory => {
933 ctes.push((
934 "summary_memory",
935 r#"
936 SELECT mlm.global_id AS global_id,
937 mlm.lir_id AS lir_id,
938 SUM(mas.size) AS total_memory,
939 SUM(mas.records) AS total_records,
940 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
941 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
942 FROM mz_introspection.mz_lir_mapping mlm
943 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
944 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
945 ON (mas.operator_id = valid_id)
946GROUP BY mlm.global_id, mlm.lir_id"#,
947 ));
948 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
949
950 if skew {
951 ctes.push((
952 "per_worker_memory",
953 r#"
954 SELECT mlm.global_id AS global_id,
955 mlm.lir_id AS lir_id,
956 mas.worker_id AS worker_id,
957 SUM(mas.size) AS worker_memory,
958 SUM(mas.records) AS worker_records
959 FROM mz_introspection.mz_lir_mapping mlm
960 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
961 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
962 ON (mas.operator_id = valid_id)
963GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
964 ));
965 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
966
967 if let Some(worker_id) = worker_id {
968 predicates.push(format!("pwm.worker_id = {worker_id}"));
969 } else {
970 worker_id = Some("pwm.worker_id");
971 columns.push("pwm.worker_id AS worker_id");
972 order_by.push("worker_id");
973 }
974
975 columns.extend([
976 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
977 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
978 "pg_size_pretty(sm.avg_memory) AS avg_memory",
979 "pg_size_pretty(sm.total_memory) AS total_memory",
980 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
981 "pwm.worker_records AS worker_records",
982 "sm.avg_records AS avg_records",
983 "sm.total_records AS total_records",
984 ]);
985 } else {
986 columns.extend([
987 "pg_size_pretty(sm.total_memory) AS total_memory",
988 "sm.total_records AS total_records",
989 ]);
990 }
991 }
992 ExplainAnalyzeComputationProperty::Cpu => {
993 ctes.push((
994 "summary_cpu",
995 r#"
996 SELECT mlm.global_id AS global_id,
997 mlm.lir_id AS lir_id,
998 SUM(mse.elapsed_ns) AS total_ns,
999 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1000 FROM mz_introspection.mz_lir_mapping mlm
1001 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1002 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1003 ON (mse.id = valid_id)
1004GROUP BY mlm.global_id, mlm.lir_id"#,
1005 ));
1006 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1007
1008 if skew {
1009 ctes.push((
1010 "per_worker_cpu",
1011 r#"
1012 SELECT mlm.global_id AS global_id,
1013 mlm.lir_id AS lir_id,
1014 mse.worker_id AS worker_id,
1015 SUM(mse.elapsed_ns) AS worker_ns
1016 FROM mz_introspection.mz_lir_mapping mlm
1017 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1018 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1019 ON (mse.id = valid_id)
1020GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1021 ));
1022 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1023
1024 if let Some(worker_id) = worker_id {
1025 predicates.push(format!("pwc.worker_id = {worker_id}"));
1026 } else {
1027 worker_id = Some("pwc.worker_id");
1028 columns.push("pwc.worker_id AS worker_id");
1029 order_by.push("worker_id");
1030 }
1031
1032 columns.extend([
1033 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1034 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1035 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1036 ]);
1037 }
1038 columns.push(
1039 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1040 );
1041 }
1042 }
1043 }
1044 }
1045 ExplainAnalyzeProperty::Hints => {
1046 columns.extend([
1047 "megsa.levels AS levels",
1048 "megsa.to_cut AS to_cut",
1049 "megsa.hint AS hint",
1050 "pg_size_pretty(megsa.savings) AS savings",
1051 ]);
1052 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1053 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1054 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1055 }
1056 }
1057
1058 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1059
1060 let ctes = if !ctes.is_empty() {
1061 format!(
1062 "WITH {}",
1063 separated(
1064 ",\n",
1065 ctes.iter()
1066 .map(|(name, defn)| format!("{name} AS ({defn})"))
1067 )
1068 )
1069 } else {
1070 String::new()
1071 };
1072 let columns = separated(", ", columns);
1073 let from = separated(" ", from);
1074 let predicates = separated(" AND ", predicates);
1075 let order_by = separated(", ", order_by);
1076 let query = format!(
1077 r#"{ctes}
1078SELECT {columns}
1079FROM {from}
1080WHERE {predicates}
1081ORDER BY {order_by}"#
1082 );
1083
1084 if statement.as_sql {
1085 let rows = vec![Row::pack_slice(&[Datum::String(
1086 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1087 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1088 })?,
1089 )])];
1090 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1091
1092 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1093 } else {
1094 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1095 show_select.plan()
1096 }
1097}
1098
1099pub fn plan_explain_analyze_cluster(
1100 scx: &StatementContext,
1101 statement: ExplainAnalyzeClusterStatement,
1102 _params: &Params,
1103) -> Result<Plan, PlanError> {
1104 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1129 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1130 let mut predicates = vec![];
1131 let mut order_by = vec![];
1132
1133 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1134 let mut worker_id = None;
1135 let mut seen_properties = BTreeSet::new();
1136 for property in properties {
1137 if !seen_properties.insert(property) {
1139 continue;
1140 }
1141
1142 match property {
1143 ExplainAnalyzeComputationProperty::Memory => {
1144 if skew {
1145 let mut set_worker_id = false;
1146 if let Some(worker_id) = worker_id {
1147 predicates.push(format!("om.worker_id = {worker_id}"));
1149 } else {
1150 worker_id = Some("om.worker_id");
1151 columns.push("om.worker_id AS worker_id");
1152 set_worker_id = true; };
1154
1155 ctes.push((
1157 "per_operator_memory_summary",
1158 r#"
1159SELECT mlm.global_id AS global_id,
1160 mlm.lir_id AS lir_id,
1161 SUM(mas.size) AS total_memory,
1162 SUM(mas.records) AS total_records,
1163 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1164 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1165FROM mz_introspection.mz_lir_mapping mlm
1166 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1167 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1168 ON (mas.operator_id = valid_id)
1169GROUP BY mlm.global_id, mlm.lir_id"#,
1170 ));
1171
1172 ctes.push((
1174 "per_operator_memory_per_worker",
1175 r#"
1176SELECT mlm.global_id AS global_id,
1177 mlm.lir_id AS lir_id,
1178 mas.worker_id AS worker_id,
1179 SUM(mas.size) AS worker_memory,
1180 SUM(mas.records) AS worker_records
1181FROM mz_introspection.mz_lir_mapping mlm
1182 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1183 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1184 ON (mas.operator_id = valid_id)
1185GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1186 ));
1187
1188 ctes.push((
1190 "per_operator_memory_ratios",
1191 r#"
1192SELECT pompw.global_id AS global_id,
1193 pompw.lir_id AS lir_id,
1194 pompw.worker_id AS worker_id,
1195 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1196 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1197 FROM per_operator_memory_per_worker pompw
1198 JOIN per_operator_memory_summary poms
1199 USING (global_id, lir_id)
1200"#,
1201 ));
1202
1203 ctes.push((
1205 "object_memory",
1206 r#"
1207SELECT pompw.global_id AS global_id,
1208 pompw.worker_id AS worker_id,
1209 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1210 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1211 SUM(pompw.worker_memory) AS worker_memory,
1212 SUM(pompw.worker_records) AS worker_records
1213FROM per_operator_memory_per_worker pompw
1214 JOIN per_operator_memory_ratios pomr
1215 USING (global_id, worker_id, lir_id)
1216GROUP BY pompw.global_id, pompw.worker_id
1217"#,
1218 ));
1219
1220 ctes.push(("object_average_memory", r#"
1222SELECT om.global_id AS global_id,
1223 SUM(om.worker_memory) AS total_memory,
1224 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1225 SUM(om.worker_records) AS total_records,
1226 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1227 FROM object_memory om
1228GROUP BY om.global_id"#));
1229
1230 from.push("LEFT JOIN object_memory om USING (global_id)");
1231 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1232
1233 columns.extend([
1234 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1235 "pg_size_pretty(om.worker_memory) AS worker_memory",
1236 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1237 "pg_size_pretty(oam.total_memory) AS total_memory",
1238 "om.max_operator_records_ratio AS max_operator_records_ratio",
1239 "om.worker_records AS worker_records",
1240 "oam.avg_records AS avg_records",
1241 "oam.total_records AS total_records",
1242 ]);
1243
1244 order_by.extend([
1245 "max_operator_memory_ratio DESC",
1246 "max_operator_records_ratio DESC",
1247 "om.worker_memory DESC",
1248 "worker_records DESC",
1249 ]);
1250
1251 if set_worker_id {
1252 order_by.push("worker_id");
1253 }
1254 } else {
1255 ctes.push((
1257 "per_operator_memory_totals",
1258 r#"
1259 SELECT mlm.global_id AS global_id,
1260 mlm.lir_id AS lir_id,
1261 SUM(mas.size) AS total_memory,
1262 SUM(mas.records) AS total_records
1263 FROM mz_introspection.mz_lir_mapping mlm
1264 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1265 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1266 ON (mas.operator_id = valid_id)
1267 GROUP BY mlm.global_id, mlm.lir_id"#,
1268 ));
1269
1270 ctes.push((
1271 "object_memory_totals",
1272 r#"
1273SELECT pomt.global_id AS global_id,
1274 SUM(pomt.total_memory) AS total_memory,
1275 SUM(pomt.total_records) AS total_records
1276FROM per_operator_memory_totals pomt
1277GROUP BY pomt.global_id
1278"#,
1279 ));
1280
1281 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1282 columns.extend([
1283 "pg_size_pretty(omt.total_memory) AS total_memory",
1284 "omt.total_records AS total_records",
1285 ]);
1286 order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1287 }
1288 }
1289 ExplainAnalyzeComputationProperty::Cpu => {
1290 if skew {
1291 let mut set_worker_id = false;
1292 if let Some(worker_id) = worker_id {
1293 predicates.push(format!("oc.worker_id = {worker_id}"));
1295 } else {
1296 worker_id = Some("oc.worker_id");
1297 columns.push("oc.worker_id AS worker_id");
1298 set_worker_id = true; };
1300
1301 ctes.push((
1303 "per_operator_cpu_summary",
1304 r#"
1305SELECT mlm.global_id AS global_id,
1306 mlm.lir_id AS lir_id,
1307 SUM(mse.elapsed_ns) AS total_ns,
1308 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1309FROM mz_introspection.mz_lir_mapping mlm
1310CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1311 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1312 ON (mse.id = valid_id)
1313GROUP BY mlm.global_id, mlm.lir_id"#,
1314));
1315
1316 ctes.push((
1318 "per_operator_cpu_per_worker",
1319 r#"
1320SELECT mlm.global_id AS global_id,
1321 mlm.lir_id AS lir_id,
1322 mse.worker_id AS worker_id,
1323 SUM(mse.elapsed_ns) AS worker_ns
1324FROM mz_introspection.mz_lir_mapping mlm
1325CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1326 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1327 ON (mse.id = valid_id)
1328GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1329 ));
1330
1331 ctes.push((
1333 "per_operator_cpu_ratios",
1334 r#"
1335SELECT pocpw.global_id AS global_id,
1336 pocpw.lir_id AS lir_id,
1337 pocpw.worker_id AS worker_id,
1338 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1339FROM per_operator_cpu_per_worker pocpw
1340 JOIN per_operator_cpu_summary pocs
1341 USING (global_id, lir_id)
1342"#,
1343 ));
1344
1345 ctes.push((
1347 "object_cpu",
1348 r#"
1349SELECT pocpw.global_id AS global_id,
1350 pocpw.worker_id AS worker_id,
1351 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1352 SUM(pocpw.worker_ns) AS worker_ns
1353FROM per_operator_cpu_per_worker pocpw
1354 JOIN per_operator_cpu_ratios pomr
1355 USING (global_id, worker_id, lir_id)
1356GROUP BY pocpw.global_id, pocpw.worker_id
1357"#,
1358 ));
1359
1360 ctes.push((
1362 "object_average_cpu",
1363 r#"
1364SELECT oc.global_id AS global_id,
1365 SUM(oc.worker_ns) AS total_ns,
1366 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1367 FROM object_cpu oc
1368GROUP BY oc.global_id"#,));
1369
1370 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1371 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1372
1373 columns.extend([
1374 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1375 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1376 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1377 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1378 ]);
1379
1380 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1381
1382 if set_worker_id {
1383 order_by.push("worker_id");
1384 }
1385 } else {
1386 ctes.push((
1388 "per_operator_cpu_totals",
1389 r#"
1390 SELECT mlm.global_id AS global_id,
1391 mlm.lir_id AS lir_id,
1392 SUM(mse.elapsed_ns) AS total_ns
1393 FROM mz_introspection.mz_lir_mapping mlm
1394 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1395 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1396 ON (mse.id = valid_id)
1397 GROUP BY mlm.global_id, mlm.lir_id"#,
1398 ));
1399
1400 ctes.push((
1401 "object_cpu_totals",
1402 r#"
1403SELECT poct.global_id AS global_id,
1404 SUM(poct.total_ns) AS total_ns
1405FROM per_operator_cpu_totals poct
1406GROUP BY poct.global_id
1407"#,
1408 ));
1409
1410 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1411 columns
1412 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1413 order_by.extend(["total_elapsed DESC"]);
1414 }
1415 }
1416 }
1417 }
1418
1419 let ctes = if !ctes.is_empty() {
1421 format!(
1422 "WITH {}",
1423 separated(
1424 ",\n",
1425 ctes.iter()
1426 .map(|(name, defn)| format!("{name} AS ({defn})"))
1427 )
1428 )
1429 } else {
1430 String::new()
1431 };
1432 let columns = separated(", ", columns);
1433 let from = separated(" ", from);
1434 let predicates = if !predicates.is_empty() {
1435 format!("WHERE {}", separated(" AND ", predicates))
1436 } else {
1437 String::new()
1438 };
1439 order_by.push("mo.name DESC");
1441 let order_by = separated(", ", order_by);
1442 let query = format!(
1443 r#"{ctes}
1444SELECT {columns}
1445FROM {from}
1446{predicates}
1447ORDER BY {order_by}"#
1448 );
1449
1450 if statement.as_sql {
1451 let rows = vec![Row::pack_slice(&[Datum::String(
1452 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1453 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1454 })?,
1455 )])];
1456 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1457
1458 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1459 } else {
1460 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1461 show_select.plan()
1462 }
1463}
1464
1465pub fn plan_explain_timestamp(
1466 scx: &StatementContext,
1467 explain: ExplainTimestampStatement<Aug>,
1468) -> Result<Plan, PlanError> {
1469 let (format, _verbose_syntax) = match explain.format() {
1470 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1471 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1472 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1473 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1474 };
1475
1476 let raw_plan = {
1477 let query::PlannedRootQuery {
1478 expr: raw_plan,
1479 desc: _,
1480 finishing: _,
1481 scope: _,
1482 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1483 if raw_plan.contains_parameters()? {
1484 return Err(PlanError::ParameterNotAllowed(
1485 "EXPLAIN TIMESTAMP".to_string(),
1486 ));
1487 }
1488
1489 raw_plan
1490 };
1491 let when = query::plan_as_of(scx, explain.select.as_of)?;
1492
1493 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1494 format,
1495 raw_plan,
1496 when,
1497 }))
1498}
1499
1500#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1503pub fn plan_query(
1504 scx: &StatementContext,
1505 query: Query<Aug>,
1506 params: &Params,
1507 lifetime: QueryLifetime,
1508) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1509 let query::PlannedRootQuery {
1510 mut expr,
1511 desc,
1512 finishing,
1513 scope,
1514 } = query::plan_root_query(scx, query, lifetime)?;
1515 expr.bind_parameters(scx, lifetime, params)?;
1516
1517 Ok(query::PlannedRootQuery {
1518 expr: expr.lower(scx.catalog.system_vars(), None)?,
1520 desc,
1521 finishing,
1522 scope,
1523 })
1524}
1525
1526generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1527
1528pub fn describe_subscribe(
1529 scx: &StatementContext,
1530 stmt: SubscribeStatement<Aug>,
1531) -> Result<StatementDesc, PlanError> {
1532 let relation_desc = match stmt.relation {
1533 SubscribeRelation::Name(name) => {
1534 let item = scx.get_item_by_resolved_name(&name)?;
1535 match item.relation_desc() {
1536 Some(desc) => desc.into_owned(),
1537 None => sql_bail!(
1538 "'{}' cannot be subscribed to because it is a {}",
1539 name.full_name_str(),
1540 item.item_type(),
1541 ),
1542 }
1543 }
1544 SubscribeRelation::Query(query) => {
1545 let query::PlannedRootQuery { desc, .. } =
1546 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1547 desc
1548 }
1549 };
1550 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1551 let progress = progress.unwrap_or(false);
1552 let mut desc = RelationDesc::builder().with_column(
1553 "mz_timestamp",
1554 SqlScalarType::Numeric {
1555 max_scale: Some(NumericMaxScale::ZERO),
1556 }
1557 .nullable(false),
1558 );
1559 if progress {
1560 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1561 }
1562
1563 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1564 match stmt.output {
1565 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1566 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1567 for (name, mut ty) in relation_desc.into_iter() {
1568 if progress {
1569 ty.nullable = true;
1570 }
1571 desc = desc.with_column(name, ty);
1572 }
1573 }
1574 SubscribeOutput::EnvelopeUpsert { key_columns }
1575 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1576 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1577 let key_columns = key_columns
1578 .into_iter()
1579 .map(normalize::column_name)
1580 .collect_vec();
1581 let mut before_values_desc = RelationDesc::builder();
1582 let mut after_values_desc = RelationDesc::builder();
1583
1584 for column_name in &key_columns {
1586 let mut column_ty = relation_desc
1587 .get_by_name(column_name)
1588 .map(|(_pos, ty)| ty.clone())
1589 .ok_or_else(|| PlanError::UnknownColumn {
1590 table: None,
1591 column: column_name.clone(),
1592 similar: Box::new([]),
1593 })?;
1594 if progress {
1595 column_ty.nullable = true;
1596 }
1597 desc = desc.with_column(column_name, column_ty);
1598 }
1599
1600 for (mut name, mut ty) in relation_desc
1603 .into_iter()
1604 .filter(|(name, _ty)| !key_columns.contains(name))
1605 {
1606 ty.nullable = true;
1607 before_values_desc =
1608 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1609 if debezium {
1610 name = format!("after_{}", name).into();
1611 }
1612 after_values_desc = after_values_desc.with_column(name, ty);
1613 }
1614
1615 if debezium {
1616 desc = desc.concat(before_values_desc);
1617 }
1618 desc = desc.concat(after_values_desc);
1619 }
1620 }
1621 Ok(StatementDesc::new(Some(desc.finish())))
1622}
1623
1624pub fn plan_subscribe(
1625 scx: &StatementContext,
1626 SubscribeStatement {
1627 relation,
1628 options,
1629 as_of,
1630 up_to,
1631 output,
1632 }: SubscribeStatement<Aug>,
1633 params: &Params,
1634 copy_to: Option<CopyFormat>,
1635) -> Result<Plan, PlanError> {
1636 let (from, desc, scope) = match relation {
1637 SubscribeRelation::Name(name) => {
1638 let item = scx.get_item_by_resolved_name(&name)?;
1639 let Some(desc) = item.relation_desc() else {
1640 sql_bail!(
1641 "'{}' cannot be subscribed to because it is a {}",
1642 name.full_name_str(),
1643 item.item_type(),
1644 );
1645 };
1646 let item_name = match name {
1647 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1648 _ => None,
1649 };
1650 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1651 (
1652 SubscribeFrom::Id(item.global_id()),
1653 desc.into_owned(),
1654 scope,
1655 )
1656 }
1657 SubscribeRelation::Query(query) => {
1658 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1660 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1664 &query.finishing,
1665 query.desc.arity()
1666 ));
1667 let desc = query.desc.clone();
1668 (
1669 SubscribeFrom::Query {
1670 expr: query.expr,
1671 desc: query.desc,
1672 },
1673 desc,
1674 query.scope,
1675 )
1676 }
1677 };
1678
1679 let when = query::plan_as_of(scx, as_of)?;
1680 let up_to = up_to
1681 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1682 .transpose()?;
1683
1684 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1685 let ecx = ExprContext {
1686 qcx: &qcx,
1687 name: "",
1688 scope: &scope,
1689 relation_type: desc.typ(),
1690 allow_aggregates: false,
1691 allow_subqueries: true,
1692 allow_parameters: true,
1693 allow_windows: false,
1694 };
1695
1696 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1697 let output = match output {
1698 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1699 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1700 let order_by = key_columns
1701 .iter()
1702 .map(|ident| OrderByExpr {
1703 expr: Expr::Identifier(vec![ident.clone()]),
1704 asc: None,
1705 nulls_last: None,
1706 })
1707 .collect_vec();
1708 let (order_by, map_exprs) = query::plan_order_by_exprs(
1709 &ExprContext {
1710 name: "ENVELOPE UPSERT KEY clause",
1711 ..ecx
1712 },
1713 &order_by[..],
1714 &output_columns[..],
1715 )?;
1716 if !map_exprs.is_empty() {
1717 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1718 }
1719 plan::SubscribeOutput::EnvelopeUpsert {
1720 order_by_keys: order_by,
1721 }
1722 }
1723 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1724 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1725 let order_by = key_columns
1726 .iter()
1727 .map(|ident| OrderByExpr {
1728 expr: Expr::Identifier(vec![ident.clone()]),
1729 asc: None,
1730 nulls_last: None,
1731 })
1732 .collect_vec();
1733 let (order_by, map_exprs) = query::plan_order_by_exprs(
1734 &ExprContext {
1735 name: "ENVELOPE DEBEZIUM KEY clause",
1736 ..ecx
1737 },
1738 &order_by[..],
1739 &output_columns[..],
1740 )?;
1741 if !map_exprs.is_empty() {
1742 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1743 }
1744 plan::SubscribeOutput::EnvelopeDebezium {
1745 order_by_keys: order_by,
1746 }
1747 }
1748 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1749 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1750 let mz_diff = "mz_diff".into();
1751 let output_columns = std::iter::once((0, &mz_diff))
1752 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1753 .collect_vec();
1754 match query::plan_order_by_exprs(
1755 &ExprContext {
1756 name: "WITHIN TIMESTAMP ORDER BY clause",
1757 ..ecx
1758 },
1759 &order_by[..],
1760 &output_columns[..],
1761 ) {
1762 Err(PlanError::UnknownColumn {
1763 table: None,
1764 column,
1765 similar: _,
1766 }) if &column == &mz_diff => {
1767 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1770 }
1771 Err(e) => return Err(e),
1772 Ok((order_by, map_exprs)) => {
1773 if !map_exprs.is_empty() {
1774 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1775 }
1776
1777 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1778 }
1779 }
1780 }
1781 };
1782
1783 let SubscribeOptionExtracted {
1784 progress, snapshot, ..
1785 } = options.try_into()?;
1786 Ok(Plan::Subscribe(SubscribePlan {
1787 from,
1788 when,
1789 up_to,
1790 with_snapshot: snapshot.unwrap_or(true),
1791 copy_to,
1792 emit_progress: progress.unwrap_or(false),
1793 output,
1794 }))
1795}
1796
1797pub fn describe_copy_from_table(
1798 scx: &StatementContext,
1799 table_name: <Aug as AstInfo>::ItemName,
1800 columns: Vec<Ident>,
1801) -> Result<StatementDesc, PlanError> {
1802 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1803 Ok(StatementDesc::new(Some(desc)))
1804}
1805
1806pub fn describe_copy_item(
1807 scx: &StatementContext,
1808 object_name: <Aug as AstInfo>::ItemName,
1809 columns: Vec<Ident>,
1810) -> Result<StatementDesc, PlanError> {
1811 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1812 Ok(StatementDesc::new(Some(desc)))
1813}
1814
1815pub fn describe_copy(
1816 scx: &StatementContext,
1817 CopyStatement {
1818 relation,
1819 direction,
1820 ..
1821 }: CopyStatement<Aug>,
1822) -> Result<StatementDesc, PlanError> {
1823 Ok(match (relation, direction) {
1824 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1825 describe_copy_item(scx, name, columns)?
1826 }
1827 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1828 describe_copy_from_table(scx, name, columns)?
1829 }
1830 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1831 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1832 }
1833 .with_is_copy())
1834}
1835
1836fn plan_copy_to_expr(
1837 scx: &StatementContext,
1838 select_plan: SelectPlan,
1839 desc: RelationDesc,
1840 to: &Expr<Aug>,
1841 format: CopyFormat,
1842 options: CopyOptionExtracted,
1843) -> Result<Plan, PlanError> {
1844 let conn_id = match options.aws_connection {
1845 Some(conn_id) => CatalogItemId::from(conn_id),
1846 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1847 };
1848 let connection = scx.get_item(&conn_id).connection()?;
1849
1850 match connection {
1851 mz_storage_types::connections::Connection::Aws(_) => {}
1852 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1853 }
1854
1855 let format = match format {
1856 CopyFormat::Csv => {
1857 let quote = extract_byte_param_value(options.quote, "quote")?;
1858 let escape = extract_byte_param_value(options.escape, "escape")?;
1859 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1860 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1861 CopyCsvFormatParams::try_new(
1862 delimiter,
1863 quote,
1864 escape,
1865 options.header,
1866 options.null,
1867 )
1868 .map_err(|e| sql_err!("{}", e))?,
1869 ))
1870 }
1871 CopyFormat::Parquet => {
1872 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1874 S3SinkFormat::Parquet
1875 }
1876 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1877 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1878 };
1879
1880 let mut to_expr = to.clone();
1882 transform_ast::transform(scx, &mut to_expr)?;
1883 let relation_type = RelationDesc::empty();
1884 let ecx = &ExprContext {
1885 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1886 name: "COPY TO target",
1887 scope: &Scope::empty(),
1888 relation_type: relation_type.typ(),
1889 allow_aggregates: false,
1890 allow_subqueries: false,
1891 allow_parameters: false,
1892 allow_windows: false,
1893 };
1894
1895 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1896
1897 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1898 sql_bail!(
1899 "MAX FILE SIZE cannot be less than {}",
1900 MIN_S3_SINK_FILE_SIZE
1901 );
1902 }
1903 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1904 sql_bail!(
1905 "MAX FILE SIZE cannot be greater than {}",
1906 MAX_S3_SINK_FILE_SIZE
1907 );
1908 }
1909
1910 Ok(Plan::CopyTo(CopyToPlan {
1911 select_plan,
1912 desc,
1913 to,
1914 connection: connection.to_owned(),
1915 connection_id: conn_id,
1916 format,
1917 max_file_size: options.max_file_size.as_bytes(),
1918 }))
1919}
1920
1921fn plan_copy_from(
1922 scx: &StatementContext,
1923 target: &CopyTarget<Aug>,
1924 table_name: ResolvedItemName,
1925 columns: Vec<Ident>,
1926 format: CopyFormat,
1927 options: CopyOptionExtracted,
1928) -> Result<Plan, PlanError> {
1929 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1930 match option {
1931 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1932 None => Ok(()),
1933 }
1934 }
1935
1936 let source = match target {
1937 CopyTarget::Stdin => CopyFromSource::Stdin,
1938 CopyTarget::Expr(from) => {
1939 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1940
1941 let mut from_expr = from.clone();
1943 transform_ast::transform(scx, &mut from_expr)?;
1944 let relation_type = RelationDesc::empty();
1945 let ecx = &ExprContext {
1946 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1947 name: "COPY FROM target",
1948 scope: &Scope::empty(),
1949 relation_type: relation_type.typ(),
1950 allow_aggregates: false,
1951 allow_subqueries: false,
1952 allow_parameters: false,
1953 allow_windows: false,
1954 };
1955 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1956
1957 match options.aws_connection {
1958 Some(conn_id) => {
1959 let conn_id = CatalogItemId::from(conn_id);
1960
1961 let connection = match scx.get_item(&conn_id).connection()? {
1963 mz_storage_types::connections::Connection::Aws(conn) => conn,
1964 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1965 };
1966
1967 CopyFromSource::AwsS3 {
1968 uri: from,
1969 connection,
1970 connection_id: conn_id,
1971 }
1972 }
1973 None => CopyFromSource::Url(from),
1974 }
1975 }
1976 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1977 };
1978
1979 let params = match format {
1980 CopyFormat::Text => {
1981 only_available_with_csv(options.quote, "quote")?;
1982 only_available_with_csv(options.escape, "escape")?;
1983 only_available_with_csv(options.header, "HEADER")?;
1984 let delimiter =
1985 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1986 let null = match options.null {
1987 Some(null) => Cow::from(null),
1988 None => Cow::from("\\N"),
1989 };
1990 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1991 }
1992 CopyFormat::Csv => {
1993 let quote = extract_byte_param_value(options.quote, "quote")?;
1994 let escape = extract_byte_param_value(options.escape, "escape")?;
1995 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1996 CopyFormatParams::Csv(
1997 CopyCsvFormatParams::try_new(
1998 delimiter,
1999 quote,
2000 escape,
2001 options.header,
2002 options.null,
2003 )
2004 .map_err(|e| sql_err!("{}", e))?,
2005 )
2006 }
2007 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2008 CopyFormat::Parquet => CopyFormatParams::Parquet,
2009 };
2010
2011 let filter = match (options.files, options.pattern) {
2012 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2013 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2014 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2015 (None, None) => None,
2016 };
2017
2018 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2019 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2020 }
2021
2022 let table_name_string = table_name.full_name_str();
2023
2024 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2025
2026 let Some(mfp) = maybe_mfp else {
2027 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2028 };
2029
2030 Ok(Plan::CopyFrom(CopyFromPlan {
2031 target_id: id,
2032 target_name: table_name_string,
2033 source,
2034 columns,
2035 source_desc,
2036 mfp,
2037 params,
2038 filter,
2039 }))
2040}
2041
2042fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2043 match v {
2044 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2045 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2046 None => Ok(None),
2047 }
2048}
2049
2050generate_extracted_config!(
2051 CopyOption,
2052 (Format, String),
2053 (Delimiter, String),
2054 (Null, String),
2055 (Escape, String),
2056 (Quote, String),
2057 (Header, bool),
2058 (AwsConnection, with_options::Object),
2059 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2060 (Files, Vec<String>),
2061 (Pattern, String)
2062);
2063
2064pub fn plan_copy(
2065 scx: &StatementContext,
2066 CopyStatement {
2067 relation,
2068 direction,
2069 target,
2070 options,
2071 }: CopyStatement<Aug>,
2072) -> Result<Plan, PlanError> {
2073 let options = CopyOptionExtracted::try_from(options)?;
2074 let format = options
2077 .format
2078 .as_ref()
2079 .map(|format| match format.to_lowercase().as_str() {
2080 "text" => Ok(CopyFormat::Text),
2081 "csv" => Ok(CopyFormat::Csv),
2082 "binary" => Ok(CopyFormat::Binary),
2083 "parquet" => Ok(CopyFormat::Parquet),
2084 _ => sql_bail!("unknown FORMAT: {}", format),
2085 })
2086 .transpose()?;
2087
2088 match (&direction, &target) {
2089 (CopyDirection::To, CopyTarget::Stdout) => {
2090 if options.delimiter.is_some() {
2091 sql_bail!("COPY TO does not support DELIMITER option yet");
2092 }
2093 if options.quote.is_some() {
2094 sql_bail!("COPY TO does not support QUOTE option yet");
2095 }
2096 if options.null.is_some() {
2097 sql_bail!("COPY TO does not support NULL option yet");
2098 }
2099 match relation {
2100 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2101 CopyRelation::Select(stmt) => Ok(plan_select(
2102 scx,
2103 stmt,
2104 &Params::empty(),
2105 Some(format.unwrap_or(CopyFormat::Text)),
2106 )?),
2107 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2108 scx,
2109 stmt,
2110 &Params::empty(),
2111 Some(format.unwrap_or(CopyFormat::Text)),
2112 )?),
2113 }
2114 }
2115 (CopyDirection::From, target) => match relation {
2116 CopyRelation::Named { name, columns } => plan_copy_from(
2117 scx,
2118 target,
2119 name,
2120 columns,
2121 format.unwrap_or(CopyFormat::Text),
2122 options,
2123 ),
2124 _ => sql_bail!("COPY FROM {} not supported", target),
2125 },
2126 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2127 if !scx.catalog.active_role_id().is_system() {
2132 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2133 }
2134
2135 let format = match format {
2136 Some(inner) => inner,
2137 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2138 };
2139
2140 let stmt = match relation {
2141 CopyRelation::Named { name, columns } => {
2142 if !columns.is_empty() {
2143 sql_bail!(
2145 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2146 );
2147 }
2148 let query = Query {
2150 ctes: CteBlock::empty(),
2151 body: SetExpr::Table(name),
2152 order_by: vec![],
2153 limit: None,
2154 offset: None,
2155 };
2156 SelectStatement { query, as_of: None }
2157 }
2158 CopyRelation::Select(stmt) => {
2159 if !stmt.query.order_by.is_empty() {
2160 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2161 }
2162 stmt
2163 }
2164 _ => sql_bail!("COPY {} {} not supported", direction, target),
2165 };
2166
2167 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2168 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2169 }
2170 _ => sql_bail!("COPY {} {} not supported", direction, target),
2171 }
2172}