1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37 SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50 UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72pub fn describe_insert(
79 scx: &StatementContext,
80 InsertStatement {
81 table_name,
82 columns,
83 source,
84 returning,
85 }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88 let desc = if returning.expr.is_empty() {
89 None
90 } else {
91 Some(returning.desc)
92 };
93 Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97 scx: &StatementContext,
98 InsertStatement {
99 table_name,
100 columns,
101 source,
102 returning,
103 }: InsertStatement<Aug>,
104 params: &Params,
105) -> Result<Plan, PlanError> {
106 let (id, mut expr, returning) =
107 query::plan_insert_query(scx, table_name, columns, source, returning)?;
108 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109 let returning = returning
110 .expr
111 .into_iter()
112 .map(|mut expr| {
113 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114 expr.lower_uncorrelated(scx.catalog.system_vars())
115 })
116 .collect::<Result<Vec<_>, _>>()?;
117
118 Ok(Plan::Insert(InsertPlan {
119 id,
120 values: expr,
121 returning,
122 }))
123}
124
125pub fn describe_delete(
126 scx: &StatementContext,
127 stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129 query::plan_delete_query(scx, stmt)?;
130 Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134 scx: &StatementContext,
135 stmt: DeleteStatement<Aug>,
136 params: &Params,
137) -> Result<Plan, PlanError> {
138 let rtw_plan = query::plan_delete_query(scx, stmt)?;
139 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143 scx: &StatementContext,
144 stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146 query::plan_update_query(scx, stmt)?;
147 Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151 scx: &StatementContext,
152 stmt: UpdateStatement<Aug>,
153 params: &Params,
154) -> Result<Plan, PlanError> {
155 let rtw_plan = query::plan_update_query(scx, stmt)?;
156 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160 scx: &StatementContext,
161 kind: MutationKind,
162 params: &Params,
163 query::ReadThenWritePlan {
164 id,
165 mut selection,
166 finishing,
167 assignments,
168 }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171 let mut assignments_outer = BTreeMap::new();
172 for (idx, mut set) in assignments {
173 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
175 assignments_outer.insert(idx, set);
176 }
177
178 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179 id,
180 selection,
181 finishing,
182 assignments: assignments_outer,
183 kind,
184 returning: Vec::new(),
185 }))
186}
187
188pub fn describe_select(
189 scx: &StatementContext,
190 stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193 return Ok(StatementDesc::new(Some(desc)));
194 }
195
196 let query::PlannedRootQuery { desc, .. } =
197 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198 Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202 scx: &StatementContext,
203 select: SelectStatement<Aug>,
204 params: &Params,
205 copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208 return Ok(Plan::SideEffectingFunc(f));
209 }
210
211 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212 Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216 scx: &StatementContext,
217 select: SelectStatement<Aug>,
218 params: &Params,
219 copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221 let when = query::plan_as_of(scx, select.as_of.clone())?;
222 let lifetime = QueryLifetime::OneShot;
223 let query::PlannedRootQuery {
224 mut expr,
225 desc,
226 finishing,
227 scope: _,
228 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229 expr.bind_parameters(scx, lifetime, params)?;
230
231 expr.try_visit_mut_pre(&mut |expr| {
234 if let HirRelationExpr::TopK { offset, .. } = expr {
235 let offset_value = offset_into_value(offset.take())?;
236 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237 }
238 Ok::<(), PlanError>(())
239 })?;
240 let limit = match finishing.limit {
250 None => None,
251 Some(mut limit) => {
252 limit.bind_parameters(scx, lifetime, params)?;
253 let Some(limit) = limit.as_literal() else {
255 sql_bail!(
256 "Top-level LIMIT must be a constant expression, got {}",
257 limit
258 )
259 };
260 match limit {
261 Datum::Null => None,
262 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263 _ => {
264 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265 sql_bail!("LIMIT must be a non-negative INT or NULL")
266 }
267 }
268 }
269 };
270 let offset = {
271 let mut offset = finishing.offset.clone();
272 offset.bind_parameters(scx, lifetime, params)?;
273 let offset = offset_into_value(offset.take())?;
274 offset
275 .try_into()
276 .expect("checked in offset_into_value that it is not negative")
277 };
278
279 let plan = SelectPlan {
280 source: expr,
281 when,
282 finishing: RowSetFinishing {
283 limit,
284 offset,
285 project: finishing.project,
286 order_by: finishing.order_by,
287 },
288 copy_to,
289 select: Some(Box::new(select)),
290 };
291
292 Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296 scx: &StatementContext,
297 explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299 let mut relation_desc = RelationDesc::builder();
300
301 match explain.stage() {
302 ExplainStage::RawPlan => {
303 let name = "Raw Plan";
304 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305 }
306 ExplainStage::DecorrelatedPlan => {
307 let name = "Decorrelated Plan";
308 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309 }
310 ExplainStage::LocalPlan => {
311 let name = "Locally Optimized Plan";
312 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313 }
314 ExplainStage::GlobalPlan => {
315 let name = "Optimized Plan";
316 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317 }
318 ExplainStage::PhysicalPlan => {
319 let name = "Physical Plan";
320 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321 }
322 ExplainStage::Trace => {
323 relation_desc = relation_desc
324 .with_column("Time", SqlScalarType::UInt64.nullable(false))
325 .with_column("Path", SqlScalarType::String.nullable(false))
326 .with_column("Plan", SqlScalarType::String.nullable(false));
327 }
328 ExplainStage::PlanInsights => {
329 let name = "Plan Insights";
330 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331 }
332 };
333 let relation_desc = relation_desc.finish();
334
335 Ok(
336 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338 _ => vec![],
339 }),
340 )
341}
342
343pub fn describe_explain_pushdown(
344 scx: &StatementContext,
345 statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347 let relation_desc = RelationDesc::builder()
348 .with_column("Source", SqlScalarType::String.nullable(false))
349 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353 .finish();
354
355 Ok(
356 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358 _ => vec![],
359 }),
360 )
361}
362
363pub fn describe_explain_analyze_object(
364 _scx: &StatementContext,
365 statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367 if statement.as_sql {
368 let relation_desc = RelationDesc::builder()
369 .with_column("SQL", SqlScalarType::String.nullable(false))
370 .finish();
371 return Ok(StatementDesc::new(Some(relation_desc)));
372 }
373
374 match statement.properties {
375 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376 properties,
377 skew,
378 }) => {
379 let mut relation_desc = RelationDesc::builder()
380 .with_column("operator", SqlScalarType::String.nullable(false));
381
382 if skew {
383 relation_desc =
384 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385 }
386
387 let mut seen_properties = BTreeSet::new();
388 for property in properties {
389 if !seen_properties.insert(property) {
391 continue;
392 }
393
394 match property {
395 ExplainAnalyzeComputationProperty::Memory if skew => {
396 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397 relation_desc = relation_desc
398 .with_column("memory_ratio", numeric.clone())
399 .with_column("worker_memory", SqlScalarType::String.nullable(true))
400 .with_column("avg_memory", SqlScalarType::String.nullable(true))
401 .with_column("total_memory", SqlScalarType::String.nullable(true))
402 .with_column("records_ratio", numeric.clone())
403 .with_column("worker_records", numeric.clone())
404 .with_column("avg_records", numeric.clone())
405 .with_column("total_records", numeric);
406 }
407 ExplainAnalyzeComputationProperty::Memory => {
408 relation_desc = relation_desc
409 .with_column("total_memory", SqlScalarType::String.nullable(true))
410 .with_column(
411 "total_records",
412 SqlScalarType::Numeric { max_scale: None }.nullable(true),
413 );
414 }
415 ExplainAnalyzeComputationProperty::Cpu => {
416 if skew {
417 relation_desc = relation_desc
418 .with_column(
419 "cpu_ratio",
420 SqlScalarType::Numeric { max_scale: None }.nullable(true),
421 )
422 .with_column(
423 "worker_elapsed",
424 SqlScalarType::Interval.nullable(true),
425 )
426 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427 }
428 relation_desc = relation_desc
429 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430 }
431 }
432 }
433
434 let relation_desc = relation_desc.finish();
435 Ok(StatementDesc::new(Some(relation_desc)))
436 }
437 ExplainAnalyzeProperty::Hints => {
438 let relation_desc = RelationDesc::builder()
439 .with_column("operator", SqlScalarType::String.nullable(true))
440 .with_column("levels", SqlScalarType::Int64.nullable(true))
441 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442 .with_column("hint", SqlScalarType::Float64.nullable(true))
443 .with_column("savings", SqlScalarType::String.nullable(true))
444 .finish();
445 Ok(StatementDesc::new(Some(relation_desc)))
446 }
447 }
448}
449
450pub fn describe_explain_analyze_cluster(
451 _scx: &StatementContext,
452 statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454 if statement.as_sql {
455 let relation_desc = RelationDesc::builder()
456 .with_column("SQL", SqlScalarType::String.nullable(false))
457 .finish();
458 return Ok(StatementDesc::new(Some(relation_desc)));
459 }
460
461 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463 let mut relation_desc = RelationDesc::builder()
464 .with_column("object", SqlScalarType::String.nullable(false))
465 .with_column("global_id", SqlScalarType::String.nullable(false));
466
467 if skew {
468 relation_desc =
469 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470 }
471
472 let mut seen_properties = BTreeSet::new();
473 for property in properties {
474 if !seen_properties.insert(property) {
476 continue;
477 }
478
479 match property {
480 ExplainAnalyzeComputationProperty::Memory if skew => {
481 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482 relation_desc = relation_desc
483 .with_column("max_operator_memory_ratio", numeric.clone())
484 .with_column("worker_memory", SqlScalarType::String.nullable(true))
485 .with_column("avg_memory", SqlScalarType::String.nullable(true))
486 .with_column("total_memory", SqlScalarType::String.nullable(true))
487 .with_column("max_operator_records_ratio", numeric.clone())
488 .with_column("worker_records", numeric.clone())
489 .with_column("avg_records", numeric.clone())
490 .with_column("total_records", numeric);
491 }
492 ExplainAnalyzeComputationProperty::Memory => {
493 relation_desc = relation_desc
494 .with_column("total_memory", SqlScalarType::String.nullable(true))
495 .with_column(
496 "total_records",
497 SqlScalarType::Numeric { max_scale: None }.nullable(true),
498 );
499 }
500 ExplainAnalyzeComputationProperty::Cpu if skew => {
501 relation_desc = relation_desc
502 .with_column(
503 "max_operator_cpu_ratio",
504 SqlScalarType::Numeric { max_scale: None }.nullable(true),
505 )
506 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509 }
510 ExplainAnalyzeComputationProperty::Cpu => {
511 relation_desc = relation_desc
512 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513 }
514 }
515 }
516
517 Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521 scx: &StatementContext,
522 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 let relation_desc = RelationDesc::builder()
525 .with_column("Timestamp", SqlScalarType::String.nullable(false))
526 .finish();
527
528 Ok(StatementDesc::new(Some(relation_desc))
529 .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533 _: &StatementContext,
534 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536 let relation_desc = RelationDesc::builder()
537 .with_column("Schema", SqlScalarType::String.nullable(false))
538 .finish();
539 Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542generate_extracted_config!(
551 ExplainPlanOption,
552 (Arity, Option<bool>, Default(None)),
553 (Cardinality, bool, Default(false)),
554 (ColumnNames, bool, Default(false)),
555 (FilterPushdown, Option<bool>, Default(None)),
556 (HumanizedExpressions, Option<bool>, Default(None)),
557 (JoinImplementations, bool, Default(false)),
558 (Keys, bool, Default(false)),
559 (LinearChains, bool, Default(false)),
560 (NoFastPath, bool, Default(false)),
561 (NonNegative, bool, Default(false)),
562 (NoNotices, bool, Default(false)),
563 (NodeIdentifiers, bool, Default(false)),
564 (Raw, bool, Default(false)),
565 (RawPlans, bool, Default(false)),
566 (RawSyntax, bool, Default(false)),
567 (Redacted, bool, Default(false)),
568 (SubtreeSize, bool, Default(false)),
569 (Timing, bool, Default(false)),
570 (Types, bool, Default(false)),
571 (Equivalences, bool, Default(false)),
572 (ReoptimizeImportedViews, Option<bool>, Default(None)),
573 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578 (
579 EnableProjectionPushdownAfterRelationCse,
580 Option<bool>,
581 Default(None)
582 )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586 type Error = PlanError;
587
588 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589 if v.raw {
592 v.raw_plans = true;
593 v.raw_syntax = true;
594 }
595
596 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600 Ok(ExplainConfig {
601 arity: v.arity.unwrap_or(enable_on_prod),
602 cardinality: v.cardinality,
603 column_names: v.column_names,
604 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606 join_impls: v.join_implementations,
607 keys: v.keys,
608 linear_chains: !v.raw_plans && v.linear_chains,
609 no_fast_path: v.no_fast_path,
610 no_notices: v.no_notices,
611 node_ids: v.node_identifiers,
612 non_negative: v.non_negative,
613 raw_plans: v.raw_plans,
614 raw_syntax: v.raw_syntax,
615 verbose_syntax: false,
616 redacted: v.redacted,
617 subtree_size: v.subtree_size,
618 equivalences: v.equivalences,
619 timing: v.timing,
620 types: v.types,
621 features: OptimizerFeatureOverrides {
623 enable_guard_subquery_tablefunc: Default::default(),
624 enable_eager_delta_joins: v.enable_eager_delta_joins,
625 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628 enable_consolidate_after_union_negate: Default::default(),
629 enable_reduce_mfp_fusion: Default::default(),
630 enable_cardinality_estimates: Default::default(),
631 persist_fast_path_limit: Default::default(),
632 reoptimize_imported_views: v.reoptimize_imported_views,
633 enable_reduce_reduction: Default::default(),
634 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
635 enable_projection_pushdown_after_relation_cse: v
636 .enable_projection_pushdown_after_relation_cse,
637 enable_less_reduce_in_eqprop: Default::default(),
638 enable_dequadratic_eqprop_map: Default::default(),
639 enable_eq_classes_withholding_errors: Default::default(),
640 enable_fast_path_plan_insights: Default::default(),
641 enable_cast_elimination: Default::default(),
642 },
643 })
644 }
645}
646
647fn plan_explainee(
648 scx: &StatementContext,
649 explainee: Explainee<Aug>,
650 params: &Params,
651) -> Result<plan::Explainee, PlanError> {
652 use crate::plan::ExplaineeStatement;
653
654 let is_replan = matches!(
655 explainee,
656 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
657 );
658
659 let explainee = match explainee {
660 Explainee::View(name) | Explainee::ReplanView(name) => {
661 let item = scx.get_item_by_resolved_name(&name)?;
662 let item_type = item.item_type();
663 if item_type != CatalogItemType::View {
664 sql_bail!("Expected {name} to be a view, not a {item_type}");
665 }
666 match is_replan {
667 true => crate::plan::Explainee::ReplanView(item.id()),
668 false => crate::plan::Explainee::View(item.id()),
669 }
670 }
671 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
672 let item = scx.get_item_by_resolved_name(&name)?;
673 let item_type = item.item_type();
674 if item_type != CatalogItemType::MaterializedView {
675 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
676 }
677 match is_replan {
678 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
679 false => crate::plan::Explainee::MaterializedView(item.id()),
680 }
681 }
682 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
683 let item = scx.get_item_by_resolved_name(&name)?;
684 let item_type = item.item_type();
685 if item_type != CatalogItemType::Index {
686 sql_bail!("Expected {name} to be an index, not a {item_type}");
687 }
688 match is_replan {
689 true => crate::plan::Explainee::ReplanIndex(item.id()),
690 false => crate::plan::Explainee::Index(item.id()),
691 }
692 }
693 Explainee::Select(select, broken) => {
694 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
695 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
696 }
697 Explainee::CreateView(mut stmt, broken) => {
698 if stmt.if_exists != IfExistsBehavior::Skip {
699 stmt.if_exists = IfExistsBehavior::Skip;
704 } else {
705 sql_bail!(
706 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
707 (the behavior is implied within the scope of an enclosing EXPLAIN)"
708 );
709 }
710
711 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
712 sql_bail!("expected CreateViewPlan plan");
713 };
714
715 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
716 }
717 Explainee::CreateMaterializedView(mut stmt, broken) => {
718 if stmt.if_exists != IfExistsBehavior::Skip {
719 stmt.if_exists = IfExistsBehavior::Skip;
724 } else {
725 sql_bail!(
726 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
727 (the behavior is implied within the scope of an enclosing EXPLAIN)"
728 );
729 }
730
731 let Plan::CreateMaterializedView(plan) =
732 ddl::plan_create_materialized_view(scx, *stmt)?
733 else {
734 sql_bail!("expected CreateMaterializedViewPlan plan");
735 };
736
737 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
738 broken,
739 plan,
740 })
741 }
742 Explainee::CreateIndex(mut stmt, broken) => {
743 if !stmt.if_not_exists {
744 stmt.if_not_exists = true;
747 } else {
748 sql_bail!(
749 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
750 (the behavior is implied within the scope of an enclosing EXPLAIN)"
751 );
752 }
753
754 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
755 sql_bail!("expected CreateIndexPlan plan");
756 };
757
758 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
759 }
760 };
761
762 Ok(explainee)
763}
764
765pub fn plan_explain_plan(
766 scx: &StatementContext,
767 explain: ExplainPlanStatement<Aug>,
768 params: &Params,
769) -> Result<Plan, PlanError> {
770 let (format, verbose_syntax) = match explain.format() {
771 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
772 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
773 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
774 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
775 };
776 let stage = explain.stage();
777
778 let mut config = {
780 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
781
782 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
783 with_options.filter_pushdown = Some(false);
785 }
786
787 ExplainConfig::try_from(with_options)?
788 };
789 config.verbose_syntax = verbose_syntax;
790
791 let explainee = plan_explainee(scx, explain.explainee, params)?;
792
793 Ok(Plan::ExplainPlan(ExplainPlanPlan {
794 stage,
795 format,
796 config,
797 explainee,
798 }))
799}
800
801pub fn plan_explain_schema(
802 scx: &StatementContext,
803 explain_schema: ExplainSinkSchemaStatement<Aug>,
804) -> Result<Plan, PlanError> {
805 let ExplainSinkSchemaStatement {
806 schema_for,
807 format: _,
809 mut statement,
810 } = explain_schema;
811
812 statement.name = Some(UnresolvedItemName::qualified(&[
816 ident!("mz_catalog"),
817 ident!("mz_explain_schema"),
818 ]));
819
820 crate::pure::purify_create_sink_avro_doc_on_options(
821 scx.catalog,
822 *statement.from.item_id(),
823 &mut statement.format,
824 )?;
825
826 match ddl::plan_create_sink(scx, statement)? {
827 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
828 StorageSinkConnection::Kafka(KafkaSinkConnection {
829 format:
830 KafkaSinkFormat {
831 key_format,
832 value_format:
833 KafkaSinkFormatType::Avro {
834 schema: value_schema,
835 ..
836 },
837 ..
838 },
839 ..
840 }) => {
841 let schema = match schema_for {
842 ExplainSinkSchemaFor::Key => key_format
843 .and_then(|f| match f {
844 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
845 _ => None,
846 })
847 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
848 ExplainSinkSchemaFor::Value => value_schema,
849 };
850
851 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
852 sink_from: sink.from,
853 json_schema: schema,
854 }))
855 }
856 _ => bail_unsupported!(
857 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
858 ),
859 },
860 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
861 }
862}
863
864pub fn plan_explain_pushdown(
865 scx: &StatementContext,
866 statement: ExplainPushdownStatement<Aug>,
867 params: &Params,
868) -> Result<Plan, PlanError> {
869 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
870 let explainee = plan_explainee(scx, statement.explainee, params)?;
871 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
872}
873
874pub fn plan_explain_analyze_object(
875 scx: &StatementContext,
876 statement: ExplainAnalyzeObjectStatement<Aug>,
877 params: &Params,
878) -> Result<Plan, PlanError> {
879 let explainee_name = statement
880 .explainee
881 .name()
882 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
883 .full_name_str();
884 let explainee = plan_explainee(scx, statement.explainee, params)?;
885
886 match explainee {
887 plan::Explainee::Index(_index_id) => (),
888 plan::Explainee::MaterializedView(_item_id) => (),
889 _ => {
890 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
891 }
892 };
893
894 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
909 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
910 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
911 let mut order_by = vec!["mlm.lir_id DESC"];
912
913 match statement.properties {
914 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
915 properties,
916 skew,
917 }) => {
918 let mut worker_id = None;
919 let mut seen_properties = BTreeSet::new();
920 for property in properties {
921 if !seen_properties.insert(property) {
923 continue;
924 }
925
926 match property {
927 ExplainAnalyzeComputationProperty::Memory => {
928 ctes.push((
929 "summary_memory",
930 r#"
931 SELECT mlm.global_id AS global_id,
932 mlm.lir_id AS lir_id,
933 SUM(mas.size) AS total_memory,
934 SUM(mas.records) AS total_records,
935 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
936 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
937 FROM mz_introspection.mz_lir_mapping mlm
938 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
939 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
940 ON (mas.operator_id = valid_id)
941GROUP BY mlm.global_id, mlm.lir_id"#,
942 ));
943 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
944
945 if skew {
946 ctes.push((
947 "per_worker_memory",
948 r#"
949 SELECT mlm.global_id AS global_id,
950 mlm.lir_id AS lir_id,
951 mas.worker_id AS worker_id,
952 SUM(mas.size) AS worker_memory,
953 SUM(mas.records) AS worker_records
954 FROM mz_introspection.mz_lir_mapping mlm
955 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
959 ));
960 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
961
962 if let Some(worker_id) = worker_id {
963 predicates.push(format!("pwm.worker_id = {worker_id}"));
964 } else {
965 worker_id = Some("pwm.worker_id");
966 columns.push("pwm.worker_id AS worker_id");
967 order_by.push("worker_id");
968 }
969
970 columns.extend([
971 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
972 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
973 "pg_size_pretty(sm.avg_memory) AS avg_memory",
974 "pg_size_pretty(sm.total_memory) AS total_memory",
975 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
976 "pwm.worker_records AS worker_records",
977 "sm.avg_records AS avg_records",
978 "sm.total_records AS total_records",
979 ]);
980 } else {
981 columns.extend([
982 "pg_size_pretty(sm.total_memory) AS total_memory",
983 "sm.total_records AS total_records",
984 ]);
985 }
986 }
987 ExplainAnalyzeComputationProperty::Cpu => {
988 ctes.push((
989 "summary_cpu",
990 r#"
991 SELECT mlm.global_id AS global_id,
992 mlm.lir_id AS lir_id,
993 SUM(mse.elapsed_ns) AS total_ns,
994 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
995 FROM mz_introspection.mz_lir_mapping mlm
996 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
997 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
998 ON (mse.id = valid_id)
999GROUP BY mlm.global_id, mlm.lir_id"#,
1000 ));
1001 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1002
1003 if skew {
1004 ctes.push((
1005 "per_worker_cpu",
1006 r#"
1007 SELECT mlm.global_id AS global_id,
1008 mlm.lir_id AS lir_id,
1009 mse.worker_id AS worker_id,
1010 SUM(mse.elapsed_ns) AS worker_ns
1011 FROM mz_introspection.mz_lir_mapping mlm
1012 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1013 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1014 ON (mse.id = valid_id)
1015GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1016 ));
1017 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1018
1019 if let Some(worker_id) = worker_id {
1020 predicates.push(format!("pwc.worker_id = {worker_id}"));
1021 } else {
1022 worker_id = Some("pwc.worker_id");
1023 columns.push("pwc.worker_id AS worker_id");
1024 order_by.push("worker_id");
1025 }
1026
1027 columns.extend([
1028 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1029 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1030 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1031 ]);
1032 }
1033 columns.push(
1034 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1035 );
1036 }
1037 }
1038 }
1039 }
1040 ExplainAnalyzeProperty::Hints => {
1041 columns.extend([
1042 "megsa.levels AS levels",
1043 "megsa.to_cut AS to_cut",
1044 "megsa.hint AS hint",
1045 "pg_size_pretty(megsa.savings) AS savings",
1046 ]);
1047 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1048 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1049 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1050 }
1051 }
1052
1053 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1054
1055 let ctes = if !ctes.is_empty() {
1056 format!(
1057 "WITH {}",
1058 separated(
1059 ",\n",
1060 ctes.iter()
1061 .map(|(name, defn)| format!("{name} AS ({defn})"))
1062 )
1063 )
1064 } else {
1065 String::new()
1066 };
1067 let columns = separated(", ", columns);
1068 let from = separated(" ", from);
1069 let predicates = separated(" AND ", predicates);
1070 let order_by = separated(", ", order_by);
1071 let query = format!(
1072 r#"{ctes}
1073SELECT {columns}
1074FROM {from}
1075WHERE {predicates}
1076ORDER BY {order_by}"#
1077 );
1078
1079 if statement.as_sql {
1080 let rows = vec![Row::pack_slice(&[Datum::String(
1081 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1082 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1083 })?,
1084 )])];
1085 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1086
1087 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1088 } else {
1089 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1090 show_select.plan()
1091 }
1092}
1093
1094pub fn plan_explain_analyze_cluster(
1095 scx: &StatementContext,
1096 statement: ExplainAnalyzeClusterStatement,
1097 _params: &Params,
1098) -> Result<Plan, PlanError> {
1099 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1124 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1125 let mut predicates = vec![];
1126 let mut order_by = vec![];
1127
1128 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1129 let mut worker_id = None;
1130 let mut seen_properties = BTreeSet::new();
1131 for property in properties {
1132 if !seen_properties.insert(property) {
1134 continue;
1135 }
1136
1137 match property {
1138 ExplainAnalyzeComputationProperty::Memory => {
1139 if skew {
1140 let mut set_worker_id = false;
1141 if let Some(worker_id) = worker_id {
1142 predicates.push(format!("om.worker_id = {worker_id}"));
1144 } else {
1145 worker_id = Some("om.worker_id");
1146 columns.push("om.worker_id AS worker_id");
1147 set_worker_id = true; };
1149
1150 ctes.push((
1152 "per_operator_memory_summary",
1153 r#"
1154SELECT mlm.global_id AS global_id,
1155 mlm.lir_id AS lir_id,
1156 SUM(mas.size) AS total_memory,
1157 SUM(mas.records) AS total_records,
1158 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1159 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1160FROM mz_introspection.mz_lir_mapping mlm
1161 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1162 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1163 ON (mas.operator_id = valid_id)
1164GROUP BY mlm.global_id, mlm.lir_id"#,
1165 ));
1166
1167 ctes.push((
1169 "per_operator_memory_per_worker",
1170 r#"
1171SELECT mlm.global_id AS global_id,
1172 mlm.lir_id AS lir_id,
1173 mas.worker_id AS worker_id,
1174 SUM(mas.size) AS worker_memory,
1175 SUM(mas.records) AS worker_records
1176FROM mz_introspection.mz_lir_mapping mlm
1177 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1178 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1179 ON (mas.operator_id = valid_id)
1180GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1181 ));
1182
1183 ctes.push((
1185 "per_operator_memory_ratios",
1186 r#"
1187SELECT pompw.global_id AS global_id,
1188 pompw.lir_id AS lir_id,
1189 pompw.worker_id AS worker_id,
1190 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1191 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1192 FROM per_operator_memory_per_worker pompw
1193 JOIN per_operator_memory_summary poms
1194 USING (global_id, lir_id)
1195"#,
1196 ));
1197
1198 ctes.push((
1200 "object_memory",
1201 r#"
1202SELECT pompw.global_id AS global_id,
1203 pompw.worker_id AS worker_id,
1204 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1205 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1206 SUM(pompw.worker_memory) AS worker_memory,
1207 SUM(pompw.worker_records) AS worker_records
1208FROM per_operator_memory_per_worker pompw
1209 JOIN per_operator_memory_ratios pomr
1210 USING (global_id, worker_id, lir_id)
1211GROUP BY pompw.global_id, pompw.worker_id
1212"#,
1213 ));
1214
1215 ctes.push(("object_average_memory", r#"
1217SELECT om.global_id AS global_id,
1218 SUM(om.worker_memory) AS total_memory,
1219 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1220 SUM(om.worker_records) AS total_records,
1221 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1222 FROM object_memory om
1223GROUP BY om.global_id"#));
1224
1225 from.push("LEFT JOIN object_memory om USING (global_id)");
1226 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1227
1228 columns.extend([
1229 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1230 "pg_size_pretty(om.worker_memory) AS worker_memory",
1231 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1232 "pg_size_pretty(oam.total_memory) AS total_memory",
1233 "om.max_operator_records_ratio AS max_operator_records_ratio",
1234 "om.worker_records AS worker_records",
1235 "oam.avg_records AS avg_records",
1236 "oam.total_records AS total_records",
1237 ]);
1238
1239 order_by.extend([
1240 "max_operator_memory_ratio DESC",
1241 "max_operator_records_ratio DESC",
1242 "worker_memory DESC",
1243 "worker_records DESC",
1244 ]);
1245
1246 if set_worker_id {
1247 order_by.push("worker_id");
1248 }
1249 } else {
1250 ctes.push((
1252 "per_operator_memory_totals",
1253 r#"
1254 SELECT mlm.global_id AS global_id,
1255 mlm.lir_id AS lir_id,
1256 SUM(mas.size) AS total_memory,
1257 SUM(mas.records) AS total_records
1258 FROM mz_introspection.mz_lir_mapping mlm
1259 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1260 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1261 ON (mas.operator_id = valid_id)
1262 GROUP BY mlm.global_id, mlm.lir_id"#,
1263 ));
1264
1265 ctes.push((
1266 "object_memory_totals",
1267 r#"
1268SELECT pomt.global_id AS global_id,
1269 SUM(pomt.total_memory) AS total_memory,
1270 SUM(pomt.total_records) AS total_records
1271FROM per_operator_memory_totals pomt
1272GROUP BY pomt.global_id
1273"#,
1274 ));
1275
1276 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1277 columns.extend([
1278 "pg_size_pretty(omt.total_memory) AS total_memory",
1279 "omt.total_records AS total_records",
1280 ]);
1281 order_by.extend(["total_memory DESC", "total_records DESC"]);
1282 }
1283 }
1284 ExplainAnalyzeComputationProperty::Cpu => {
1285 if skew {
1286 let mut set_worker_id = false;
1287 if let Some(worker_id) = worker_id {
1288 predicates.push(format!("oc.worker_id = {worker_id}"));
1290 } else {
1291 worker_id = Some("oc.worker_id");
1292 columns.push("oc.worker_id AS worker_id");
1293 set_worker_id = true; };
1295
1296 ctes.push((
1298 "per_operator_cpu_summary",
1299 r#"
1300SELECT mlm.global_id AS global_id,
1301 mlm.lir_id AS lir_id,
1302 SUM(mse.elapsed_ns) AS total_ns,
1303 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1304FROM mz_introspection.mz_lir_mapping mlm
1305CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1306 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1307 ON (mse.id = valid_id)
1308GROUP BY mlm.global_id, mlm.lir_id"#,
1309));
1310
1311 ctes.push((
1313 "per_operator_cpu_per_worker",
1314 r#"
1315SELECT mlm.global_id AS global_id,
1316 mlm.lir_id AS lir_id,
1317 mse.worker_id AS worker_id,
1318 SUM(mse.elapsed_ns) AS worker_ns
1319FROM mz_introspection.mz_lir_mapping mlm
1320CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1321 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1322 ON (mse.id = valid_id)
1323GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1324 ));
1325
1326 ctes.push((
1328 "per_operator_cpu_ratios",
1329 r#"
1330SELECT pocpw.global_id AS global_id,
1331 pocpw.lir_id AS lir_id,
1332 pocpw.worker_id AS worker_id,
1333 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1334FROM per_operator_cpu_per_worker pocpw
1335 JOIN per_operator_cpu_summary pocs
1336 USING (global_id, lir_id)
1337"#,
1338 ));
1339
1340 ctes.push((
1342 "object_cpu",
1343 r#"
1344SELECT pocpw.global_id AS global_id,
1345 pocpw.worker_id AS worker_id,
1346 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1347 SUM(pocpw.worker_ns) AS worker_ns
1348FROM per_operator_cpu_per_worker pocpw
1349 JOIN per_operator_cpu_ratios pomr
1350 USING (global_id, worker_id, lir_id)
1351GROUP BY pocpw.global_id, pocpw.worker_id
1352"#,
1353 ));
1354
1355 ctes.push((
1357 "object_average_cpu",
1358 r#"
1359SELECT oc.global_id AS global_id,
1360 SUM(oc.worker_ns) AS total_ns,
1361 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1362 FROM object_cpu oc
1363GROUP BY oc.global_id"#,));
1364
1365 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1366 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1367
1368 columns.extend([
1369 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1370 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1371 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1372 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1373 ]);
1374
1375 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1376
1377 if set_worker_id {
1378 order_by.push("worker_id");
1379 }
1380 } else {
1381 ctes.push((
1383 "per_operator_cpu_totals",
1384 r#"
1385 SELECT mlm.global_id AS global_id,
1386 mlm.lir_id AS lir_id,
1387 SUM(mse.elapsed_ns) AS total_ns
1388 FROM mz_introspection.mz_lir_mapping mlm
1389 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1390 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1391 ON (mse.id = valid_id)
1392 GROUP BY mlm.global_id, mlm.lir_id"#,
1393 ));
1394
1395 ctes.push((
1396 "object_cpu_totals",
1397 r#"
1398SELECT poct.global_id AS global_id,
1399 SUM(poct.total_ns) AS total_ns
1400FROM per_operator_cpu_totals poct
1401GROUP BY poct.global_id
1402"#,
1403 ));
1404
1405 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1406 columns
1407 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1408 order_by.extend(["total_elapsed DESC"]);
1409 }
1410 }
1411 }
1412 }
1413
1414 let ctes = if !ctes.is_empty() {
1416 format!(
1417 "WITH {}",
1418 separated(
1419 ",\n",
1420 ctes.iter()
1421 .map(|(name, defn)| format!("{name} AS ({defn})"))
1422 )
1423 )
1424 } else {
1425 String::new()
1426 };
1427 let columns = separated(", ", columns);
1428 let from = separated(" ", from);
1429 let predicates = if !predicates.is_empty() {
1430 format!("WHERE {}", separated(" AND ", predicates))
1431 } else {
1432 String::new()
1433 };
1434 order_by.push("mo.name DESC");
1436 let order_by = separated(", ", order_by);
1437 let query = format!(
1438 r#"{ctes}
1439SELECT {columns}
1440FROM {from}
1441{predicates}
1442ORDER BY {order_by}"#
1443 );
1444
1445 if statement.as_sql {
1446 let rows = vec![Row::pack_slice(&[Datum::String(
1447 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1448 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1449 })?,
1450 )])];
1451 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1452
1453 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1454 } else {
1455 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1456 show_select.plan()
1457 }
1458}
1459
1460pub fn plan_explain_timestamp(
1461 scx: &StatementContext,
1462 explain: ExplainTimestampStatement<Aug>,
1463) -> Result<Plan, PlanError> {
1464 let (format, _verbose_syntax) = match explain.format() {
1465 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1466 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1467 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1468 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1469 };
1470
1471 let raw_plan = {
1472 let query::PlannedRootQuery {
1473 expr: raw_plan,
1474 desc: _,
1475 finishing: _,
1476 scope: _,
1477 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1478 if raw_plan.contains_parameters()? {
1479 return Err(PlanError::ParameterNotAllowed(
1480 "EXPLAIN TIMESTAMP".to_string(),
1481 ));
1482 }
1483
1484 raw_plan
1485 };
1486 let when = query::plan_as_of(scx, explain.select.as_of)?;
1487
1488 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1489 format,
1490 raw_plan,
1491 when,
1492 }))
1493}
1494
1495#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1498pub fn plan_query(
1499 scx: &StatementContext,
1500 query: Query<Aug>,
1501 params: &Params,
1502 lifetime: QueryLifetime,
1503) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1504 let query::PlannedRootQuery {
1505 mut expr,
1506 desc,
1507 finishing,
1508 scope,
1509 } = query::plan_root_query(scx, query, lifetime)?;
1510 expr.bind_parameters(scx, lifetime, params)?;
1511
1512 Ok(query::PlannedRootQuery {
1513 expr: expr.lower(scx.catalog.system_vars(), None)?,
1515 desc,
1516 finishing,
1517 scope,
1518 })
1519}
1520
1521generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1522
1523pub fn describe_subscribe(
1524 scx: &StatementContext,
1525 stmt: SubscribeStatement<Aug>,
1526) -> Result<StatementDesc, PlanError> {
1527 let relation_desc = match stmt.relation {
1528 SubscribeRelation::Name(name) => {
1529 let item = scx.get_item_by_resolved_name(&name)?;
1530 match item.relation_desc() {
1531 Some(desc) => desc.into_owned(),
1532 None => sql_bail!(
1533 "'{}' cannot be subscribed to because it is a {}",
1534 name.full_name_str(),
1535 item.item_type(),
1536 ),
1537 }
1538 }
1539 SubscribeRelation::Query(query) => {
1540 let query::PlannedRootQuery { desc, .. } =
1541 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1542 desc
1543 }
1544 };
1545 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1546 let progress = progress.unwrap_or(false);
1547 let mut desc = RelationDesc::builder().with_column(
1548 "mz_timestamp",
1549 SqlScalarType::Numeric {
1550 max_scale: Some(NumericMaxScale::ZERO),
1551 }
1552 .nullable(false),
1553 );
1554 if progress {
1555 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1556 }
1557
1558 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1559 match stmt.output {
1560 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1561 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1562 for (name, mut ty) in relation_desc.into_iter() {
1563 if progress {
1564 ty.nullable = true;
1565 }
1566 desc = desc.with_column(name, ty);
1567 }
1568 }
1569 SubscribeOutput::EnvelopeUpsert { key_columns }
1570 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1571 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1572 let key_columns = key_columns
1573 .into_iter()
1574 .map(normalize::column_name)
1575 .collect_vec();
1576 let mut before_values_desc = RelationDesc::builder();
1577 let mut after_values_desc = RelationDesc::builder();
1578
1579 for column_name in &key_columns {
1581 let mut column_ty = relation_desc
1582 .get_by_name(column_name)
1583 .map(|(_pos, ty)| ty.clone())
1584 .ok_or_else(|| PlanError::UnknownColumn {
1585 table: None,
1586 column: column_name.clone(),
1587 similar: Box::new([]),
1588 })?;
1589 if progress {
1590 column_ty.nullable = true;
1591 }
1592 desc = desc.with_column(column_name, column_ty);
1593 }
1594
1595 for (mut name, mut ty) in relation_desc
1598 .into_iter()
1599 .filter(|(name, _ty)| !key_columns.contains(name))
1600 {
1601 ty.nullable = true;
1602 before_values_desc =
1603 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1604 if debezium {
1605 name = format!("after_{}", name).into();
1606 }
1607 after_values_desc = after_values_desc.with_column(name, ty);
1608 }
1609
1610 if debezium {
1611 desc = desc.concat(before_values_desc);
1612 }
1613 desc = desc.concat(after_values_desc);
1614 }
1615 }
1616 Ok(StatementDesc::new(Some(desc.finish())))
1617}
1618
1619pub fn plan_subscribe(
1620 scx: &StatementContext,
1621 SubscribeStatement {
1622 relation,
1623 options,
1624 as_of,
1625 up_to,
1626 output,
1627 }: SubscribeStatement<Aug>,
1628 params: &Params,
1629 copy_to: Option<CopyFormat>,
1630) -> Result<Plan, PlanError> {
1631 let (from, desc, scope) = match relation {
1632 SubscribeRelation::Name(name) => {
1633 let item = scx.get_item_by_resolved_name(&name)?;
1634 let Some(desc) = item.relation_desc() else {
1635 sql_bail!(
1636 "'{}' cannot be subscribed to because it is a {}",
1637 name.full_name_str(),
1638 item.item_type(),
1639 );
1640 };
1641 let item_name = match name {
1642 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1643 _ => None,
1644 };
1645 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1646 (
1647 SubscribeFrom::Id(item.global_id()),
1648 desc.into_owned(),
1649 scope,
1650 )
1651 }
1652 SubscribeRelation::Query(query) => {
1653 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1655 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1659 &query.finishing,
1660 query.desc.arity()
1661 ));
1662 let desc = query.desc.clone();
1663 (
1664 SubscribeFrom::Query {
1665 expr: query.expr,
1666 desc: query.desc,
1667 },
1668 desc,
1669 query.scope,
1670 )
1671 }
1672 };
1673
1674 let when = query::plan_as_of(scx, as_of)?;
1675 let up_to = up_to
1676 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1677 .transpose()?;
1678
1679 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1680 let ecx = ExprContext {
1681 qcx: &qcx,
1682 name: "",
1683 scope: &scope,
1684 relation_type: desc.typ(),
1685 allow_aggregates: false,
1686 allow_subqueries: true,
1687 allow_parameters: true,
1688 allow_windows: false,
1689 };
1690
1691 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1692 let output = match output {
1693 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1694 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1695 let order_by = key_columns
1696 .iter()
1697 .map(|ident| OrderByExpr {
1698 expr: Expr::Identifier(vec![ident.clone()]),
1699 asc: None,
1700 nulls_last: None,
1701 })
1702 .collect_vec();
1703 let (order_by, map_exprs) = query::plan_order_by_exprs(
1704 &ExprContext {
1705 name: "ENVELOPE UPSERT KEY clause",
1706 ..ecx
1707 },
1708 &order_by[..],
1709 &output_columns[..],
1710 )?;
1711 if !map_exprs.is_empty() {
1712 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1713 }
1714 plan::SubscribeOutput::EnvelopeUpsert {
1715 order_by_keys: order_by,
1716 }
1717 }
1718 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1719 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1720 let order_by = key_columns
1721 .iter()
1722 .map(|ident| OrderByExpr {
1723 expr: Expr::Identifier(vec![ident.clone()]),
1724 asc: None,
1725 nulls_last: None,
1726 })
1727 .collect_vec();
1728 let (order_by, map_exprs) = query::plan_order_by_exprs(
1729 &ExprContext {
1730 name: "ENVELOPE DEBEZIUM KEY clause",
1731 ..ecx
1732 },
1733 &order_by[..],
1734 &output_columns[..],
1735 )?;
1736 if !map_exprs.is_empty() {
1737 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1738 }
1739 plan::SubscribeOutput::EnvelopeDebezium {
1740 order_by_keys: order_by,
1741 }
1742 }
1743 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1744 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1745 let mz_diff = "mz_diff".into();
1746 let output_columns = std::iter::once((0, &mz_diff))
1747 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1748 .collect_vec();
1749 match query::plan_order_by_exprs(
1750 &ExprContext {
1751 name: "WITHIN TIMESTAMP ORDER BY clause",
1752 ..ecx
1753 },
1754 &order_by[..],
1755 &output_columns[..],
1756 ) {
1757 Err(PlanError::UnknownColumn {
1758 table: None,
1759 column,
1760 similar: _,
1761 }) if &column == &mz_diff => {
1762 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1765 }
1766 Err(e) => return Err(e),
1767 Ok((order_by, map_exprs)) => {
1768 if !map_exprs.is_empty() {
1769 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1770 }
1771
1772 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1773 }
1774 }
1775 }
1776 };
1777
1778 let SubscribeOptionExtracted {
1779 progress, snapshot, ..
1780 } = options.try_into()?;
1781 Ok(Plan::Subscribe(SubscribePlan {
1782 from,
1783 when,
1784 up_to,
1785 with_snapshot: snapshot.unwrap_or(true),
1786 copy_to,
1787 emit_progress: progress.unwrap_or(false),
1788 output,
1789 }))
1790}
1791
1792pub fn describe_copy_from_table(
1793 scx: &StatementContext,
1794 table_name: <Aug as AstInfo>::ItemName,
1795 columns: Vec<Ident>,
1796) -> Result<StatementDesc, PlanError> {
1797 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1798 Ok(StatementDesc::new(Some(desc)))
1799}
1800
1801pub fn describe_copy_item(
1802 scx: &StatementContext,
1803 object_name: <Aug as AstInfo>::ItemName,
1804 columns: Vec<Ident>,
1805) -> Result<StatementDesc, PlanError> {
1806 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1807 Ok(StatementDesc::new(Some(desc)))
1808}
1809
1810pub fn describe_copy(
1811 scx: &StatementContext,
1812 CopyStatement {
1813 relation,
1814 direction,
1815 ..
1816 }: CopyStatement<Aug>,
1817) -> Result<StatementDesc, PlanError> {
1818 Ok(match (relation, direction) {
1819 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1820 describe_copy_item(scx, name, columns)?
1821 }
1822 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1823 describe_copy_from_table(scx, name, columns)?
1824 }
1825 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1826 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1827 }
1828 .with_is_copy())
1829}
1830
1831fn plan_copy_to_expr(
1832 scx: &StatementContext,
1833 select_plan: SelectPlan,
1834 desc: RelationDesc,
1835 to: &Expr<Aug>,
1836 format: CopyFormat,
1837 options: CopyOptionExtracted,
1838) -> Result<Plan, PlanError> {
1839 let conn_id = match options.aws_connection {
1840 Some(conn_id) => CatalogItemId::from(conn_id),
1841 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1842 };
1843 let connection = scx.get_item(&conn_id).connection()?;
1844
1845 match connection {
1846 mz_storage_types::connections::Connection::Aws(_) => {}
1847 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1848 }
1849
1850 let format = match format {
1851 CopyFormat::Csv => {
1852 let quote = extract_byte_param_value(options.quote, "quote")?;
1853 let escape = extract_byte_param_value(options.escape, "escape")?;
1854 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1855 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1856 CopyCsvFormatParams::try_new(
1857 delimiter,
1858 quote,
1859 escape,
1860 options.header,
1861 options.null,
1862 )
1863 .map_err(|e| sql_err!("{}", e))?,
1864 ))
1865 }
1866 CopyFormat::Parquet => {
1867 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1869 S3SinkFormat::Parquet
1870 }
1871 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1872 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1873 };
1874
1875 let mut to_expr = to.clone();
1877 transform_ast::transform(scx, &mut to_expr)?;
1878 let relation_type = RelationDesc::empty();
1879 let ecx = &ExprContext {
1880 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1881 name: "COPY TO target",
1882 scope: &Scope::empty(),
1883 relation_type: relation_type.typ(),
1884 allow_aggregates: false,
1885 allow_subqueries: false,
1886 allow_parameters: false,
1887 allow_windows: false,
1888 };
1889
1890 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1891
1892 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1893 sql_bail!(
1894 "MAX FILE SIZE cannot be less than {}",
1895 MIN_S3_SINK_FILE_SIZE
1896 );
1897 }
1898 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1899 sql_bail!(
1900 "MAX FILE SIZE cannot be greater than {}",
1901 MAX_S3_SINK_FILE_SIZE
1902 );
1903 }
1904
1905 Ok(Plan::CopyTo(CopyToPlan {
1906 select_plan,
1907 desc,
1908 to,
1909 connection: connection.to_owned(),
1910 connection_id: conn_id,
1911 format,
1912 max_file_size: options.max_file_size.as_bytes(),
1913 }))
1914}
1915
1916fn plan_copy_from(
1917 scx: &StatementContext,
1918 target: &CopyTarget<Aug>,
1919 table_name: ResolvedItemName,
1920 columns: Vec<Ident>,
1921 format: CopyFormat,
1922 options: CopyOptionExtracted,
1923) -> Result<Plan, PlanError> {
1924 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1925 match option {
1926 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1927 None => Ok(()),
1928 }
1929 }
1930
1931 let source = match target {
1932 CopyTarget::Stdin => CopyFromSource::Stdin,
1933 CopyTarget::Expr(from) => {
1934 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1935
1936 let mut from_expr = from.clone();
1938 transform_ast::transform(scx, &mut from_expr)?;
1939 let relation_type = RelationDesc::empty();
1940 let ecx = &ExprContext {
1941 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1942 name: "COPY FROM target",
1943 scope: &Scope::empty(),
1944 relation_type: relation_type.typ(),
1945 allow_aggregates: false,
1946 allow_subqueries: false,
1947 allow_parameters: false,
1948 allow_windows: false,
1949 };
1950 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1951
1952 match options.aws_connection {
1953 Some(conn_id) => {
1954 let conn_id = CatalogItemId::from(conn_id);
1955
1956 let connection = match scx.get_item(&conn_id).connection()? {
1958 mz_storage_types::connections::Connection::Aws(conn) => conn,
1959 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1960 };
1961
1962 CopyFromSource::AwsS3 {
1963 uri: from,
1964 connection,
1965 connection_id: conn_id,
1966 }
1967 }
1968 None => CopyFromSource::Url(from),
1969 }
1970 }
1971 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1972 };
1973
1974 let params = match format {
1975 CopyFormat::Text => {
1976 only_available_with_csv(options.quote, "quote")?;
1977 only_available_with_csv(options.escape, "escape")?;
1978 only_available_with_csv(options.header, "HEADER")?;
1979 let delimiter =
1980 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1981 let null = match options.null {
1982 Some(null) => Cow::from(null),
1983 None => Cow::from("\\N"),
1984 };
1985 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1986 }
1987 CopyFormat::Csv => {
1988 let quote = extract_byte_param_value(options.quote, "quote")?;
1989 let escape = extract_byte_param_value(options.escape, "escape")?;
1990 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1991 CopyFormatParams::Csv(
1992 CopyCsvFormatParams::try_new(
1993 delimiter,
1994 quote,
1995 escape,
1996 options.header,
1997 options.null,
1998 )
1999 .map_err(|e| sql_err!("{}", e))?,
2000 )
2001 }
2002 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2003 CopyFormat::Parquet => CopyFormatParams::Parquet,
2004 };
2005
2006 let filter = match (options.files, options.pattern) {
2007 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2008 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2009 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2010 (None, None) => None,
2011 };
2012
2013 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2014 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2015 }
2016
2017 let table_name_string = table_name.full_name_str();
2018
2019 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2020
2021 let Some(mfp) = maybe_mfp else {
2022 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2023 };
2024
2025 Ok(Plan::CopyFrom(CopyFromPlan {
2026 target_id: id,
2027 target_name: table_name_string,
2028 source,
2029 columns,
2030 source_desc,
2031 mfp,
2032 params,
2033 filter,
2034 }))
2035}
2036
2037fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2038 match v {
2039 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2040 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2041 None => Ok(None),
2042 }
2043}
2044
2045generate_extracted_config!(
2046 CopyOption,
2047 (Format, String),
2048 (Delimiter, String),
2049 (Null, String),
2050 (Escape, String),
2051 (Quote, String),
2052 (Header, bool),
2053 (AwsConnection, with_options::Object),
2054 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2055 (Files, Vec<String>),
2056 (Pattern, String)
2057);
2058
2059pub fn plan_copy(
2060 scx: &StatementContext,
2061 CopyStatement {
2062 relation,
2063 direction,
2064 target,
2065 options,
2066 }: CopyStatement<Aug>,
2067) -> Result<Plan, PlanError> {
2068 let options = CopyOptionExtracted::try_from(options)?;
2069 let format = options
2072 .format
2073 .as_ref()
2074 .map(|format| match format.to_lowercase().as_str() {
2075 "text" => Ok(CopyFormat::Text),
2076 "csv" => Ok(CopyFormat::Csv),
2077 "binary" => Ok(CopyFormat::Binary),
2078 "parquet" => Ok(CopyFormat::Parquet),
2079 _ => sql_bail!("unknown FORMAT: {}", format),
2080 })
2081 .transpose()?;
2082
2083 match (&direction, &target) {
2084 (CopyDirection::To, CopyTarget::Stdout) => {
2085 if options.delimiter.is_some() {
2086 sql_bail!("COPY TO does not support DELIMITER option yet");
2087 }
2088 if options.quote.is_some() {
2089 sql_bail!("COPY TO does not support QUOTE option yet");
2090 }
2091 if options.null.is_some() {
2092 sql_bail!("COPY TO does not support NULL option yet");
2093 }
2094 match relation {
2095 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2096 CopyRelation::Select(stmt) => Ok(plan_select(
2097 scx,
2098 stmt,
2099 &Params::empty(),
2100 Some(format.unwrap_or(CopyFormat::Text)),
2101 )?),
2102 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2103 scx,
2104 stmt,
2105 &Params::empty(),
2106 Some(format.unwrap_or(CopyFormat::Text)),
2107 )?),
2108 }
2109 }
2110 (CopyDirection::From, target) => match relation {
2111 CopyRelation::Named { name, columns } => plan_copy_from(
2112 scx,
2113 target,
2114 name,
2115 columns,
2116 format.unwrap_or(CopyFormat::Text),
2117 options,
2118 ),
2119 _ => sql_bail!("COPY FROM {} not supported", target),
2120 },
2121 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2122 if !scx.catalog.active_role_id().is_system() {
2127 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2128 }
2129
2130 let format = match format {
2131 Some(inner) => inner,
2132 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2133 };
2134
2135 let stmt = match relation {
2136 CopyRelation::Named { name, columns } => {
2137 if !columns.is_empty() {
2138 sql_bail!(
2140 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2141 );
2142 }
2143 let query = Query {
2145 ctes: CteBlock::empty(),
2146 body: SetExpr::Table(name),
2147 order_by: vec![],
2148 limit: None,
2149 offset: None,
2150 };
2151 SelectStatement { query, as_of: None }
2152 }
2153 CopyRelation::Select(stmt) => {
2154 if !stmt.query.order_by.is_empty() {
2155 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2156 }
2157 stmt
2158 }
2159 _ => sql_bail!("COPY {} {} not supported", direction, target),
2160 };
2161
2162 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2163 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2164 }
2165 _ => sql_bail!("COPY {} {} not supported", direction, target),
2166 }
2167}