1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37 SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50 UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72pub fn describe_insert(
79 scx: &StatementContext,
80 InsertStatement {
81 table_name,
82 columns,
83 source,
84 returning,
85 }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88 let desc = if returning.expr.is_empty() {
89 None
90 } else {
91 Some(returning.desc)
92 };
93 Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97 scx: &StatementContext,
98 InsertStatement {
99 table_name,
100 columns,
101 source,
102 returning,
103 }: InsertStatement<Aug>,
104 params: &Params,
105) -> Result<Plan, PlanError> {
106 let (id, mut expr, returning) =
107 query::plan_insert_query(scx, table_name, columns, source, returning)?;
108 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109 let returning = returning
110 .expr
111 .into_iter()
112 .map(|mut expr| {
113 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114 expr.lower_uncorrelated()
115 })
116 .collect::<Result<Vec<_>, _>>()?;
117
118 Ok(Plan::Insert(InsertPlan {
119 id,
120 values: expr,
121 returning,
122 }))
123}
124
125pub fn describe_delete(
126 scx: &StatementContext,
127 stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129 query::plan_delete_query(scx, stmt)?;
130 Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134 scx: &StatementContext,
135 stmt: DeleteStatement<Aug>,
136 params: &Params,
137) -> Result<Plan, PlanError> {
138 let rtw_plan = query::plan_delete_query(scx, stmt)?;
139 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143 scx: &StatementContext,
144 stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146 query::plan_update_query(scx, stmt)?;
147 Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151 scx: &StatementContext,
152 stmt: UpdateStatement<Aug>,
153 params: &Params,
154) -> Result<Plan, PlanError> {
155 let rtw_plan = query::plan_update_query(scx, stmt)?;
156 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160 scx: &StatementContext,
161 kind: MutationKind,
162 params: &Params,
163 query::ReadThenWritePlan {
164 id,
165 mut selection,
166 finishing,
167 assignments,
168 }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171 let mut assignments_outer = BTreeMap::new();
172 for (idx, mut set) in assignments {
173 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174 let set = set.lower_uncorrelated()?;
175 assignments_outer.insert(idx, set);
176 }
177
178 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179 id,
180 selection,
181 finishing,
182 assignments: assignments_outer,
183 kind,
184 returning: Vec::new(),
185 }))
186}
187
188pub fn describe_select(
189 scx: &StatementContext,
190 stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193 return Ok(StatementDesc::new(Some(desc)));
194 }
195
196 let query::PlannedRootQuery { desc, .. } =
197 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198 Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202 scx: &StatementContext,
203 select: SelectStatement<Aug>,
204 params: &Params,
205 copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208 return Ok(Plan::SideEffectingFunc(f));
209 }
210
211 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212 Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216 scx: &StatementContext,
217 select: SelectStatement<Aug>,
218 params: &Params,
219 copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221 let when = query::plan_as_of(scx, select.as_of.clone())?;
222 let lifetime = QueryLifetime::OneShot;
223 let query::PlannedRootQuery {
224 mut expr,
225 desc,
226 finishing,
227 scope: _,
228 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229 expr.bind_parameters(scx, lifetime, params)?;
230
231 expr.try_visit_mut_pre(&mut |expr| {
234 if let HirRelationExpr::TopK { offset, .. } = expr {
235 let offset_value = offset_into_value(offset.take())?;
236 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237 }
238 Ok::<(), PlanError>(())
239 })?;
240 let limit = match finishing.limit {
250 None => None,
251 Some(mut limit) => {
252 limit.bind_parameters(scx, lifetime, params)?;
253 let Some(limit) = limit.as_literal() else {
255 sql_bail!(
256 "Top-level LIMIT must be a constant expression, got {}",
257 limit
258 )
259 };
260 match limit {
261 Datum::Null => None,
262 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263 _ => {
264 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265 sql_bail!("LIMIT must be a non-negative INT or NULL")
266 }
267 }
268 }
269 };
270 let offset = {
271 let mut offset = finishing.offset.clone();
272 offset.bind_parameters(scx, lifetime, params)?;
273 let offset = offset_into_value(offset.take())?;
274 offset
275 .try_into()
276 .expect("checked in offset_into_value that it is not negative")
277 };
278
279 let plan = SelectPlan {
280 source: expr,
281 when,
282 finishing: RowSetFinishing {
283 limit,
284 offset,
285 project: finishing.project,
286 order_by: finishing.order_by,
287 },
288 copy_to,
289 select: Some(Box::new(select)),
290 };
291
292 Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296 scx: &StatementContext,
297 explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299 let mut relation_desc = RelationDesc::builder();
300
301 match explain.stage() {
302 ExplainStage::RawPlan => {
303 let name = "Raw Plan";
304 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305 }
306 ExplainStage::DecorrelatedPlan => {
307 let name = "Decorrelated Plan";
308 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309 }
310 ExplainStage::LocalPlan => {
311 let name = "Locally Optimized Plan";
312 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313 }
314 ExplainStage::GlobalPlan => {
315 let name = "Optimized Plan";
316 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317 }
318 ExplainStage::PhysicalPlan => {
319 let name = "Physical Plan";
320 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321 }
322 ExplainStage::Trace => {
323 relation_desc = relation_desc
324 .with_column("Time", SqlScalarType::UInt64.nullable(false))
325 .with_column("Path", SqlScalarType::String.nullable(false))
326 .with_column("Plan", SqlScalarType::String.nullable(false));
327 }
328 ExplainStage::PlanInsights => {
329 let name = "Plan Insights";
330 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331 }
332 };
333 let relation_desc = relation_desc.finish();
334
335 Ok(
336 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338 _ => vec![],
339 }),
340 )
341}
342
343pub fn describe_explain_pushdown(
344 scx: &StatementContext,
345 statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347 let relation_desc = RelationDesc::builder()
348 .with_column("Source", SqlScalarType::String.nullable(false))
349 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353 .finish();
354
355 Ok(
356 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358 _ => vec![],
359 }),
360 )
361}
362
363pub fn describe_explain_analyze_object(
364 _scx: &StatementContext,
365 statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367 if statement.as_sql {
368 let relation_desc = RelationDesc::builder()
369 .with_column("SQL", SqlScalarType::String.nullable(false))
370 .finish();
371 return Ok(StatementDesc::new(Some(relation_desc)));
372 }
373
374 match statement.properties {
375 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376 properties,
377 skew,
378 }) => {
379 let mut relation_desc = RelationDesc::builder()
380 .with_column("operator", SqlScalarType::String.nullable(false));
381
382 if skew {
383 relation_desc =
384 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385 }
386
387 let mut seen_properties = BTreeSet::new();
388 for property in properties {
389 if !seen_properties.insert(property) {
391 continue;
392 }
393
394 match property {
395 ExplainAnalyzeComputationProperty::Memory if skew => {
396 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397 relation_desc = relation_desc
398 .with_column("memory_ratio", numeric.clone())
399 .with_column("worker_memory", SqlScalarType::String.nullable(true))
400 .with_column("avg_memory", SqlScalarType::String.nullable(true))
401 .with_column("total_memory", SqlScalarType::String.nullable(true))
402 .with_column("records_ratio", numeric.clone())
403 .with_column("worker_records", numeric.clone())
404 .with_column("avg_records", numeric.clone())
405 .with_column("total_records", numeric);
406 }
407 ExplainAnalyzeComputationProperty::Memory => {
408 relation_desc = relation_desc
409 .with_column("total_memory", SqlScalarType::String.nullable(true))
410 .with_column(
411 "total_records",
412 SqlScalarType::Numeric { max_scale: None }.nullable(true),
413 );
414 }
415 ExplainAnalyzeComputationProperty::Cpu => {
416 if skew {
417 relation_desc = relation_desc
418 .with_column(
419 "cpu_ratio",
420 SqlScalarType::Numeric { max_scale: None }.nullable(true),
421 )
422 .with_column(
423 "worker_elapsed",
424 SqlScalarType::Interval.nullable(true),
425 )
426 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427 }
428 relation_desc = relation_desc
429 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430 }
431 }
432 }
433
434 let relation_desc = relation_desc.finish();
435 Ok(StatementDesc::new(Some(relation_desc)))
436 }
437 ExplainAnalyzeProperty::Hints => {
438 let relation_desc = RelationDesc::builder()
439 .with_column("operator", SqlScalarType::String.nullable(true))
440 .with_column("levels", SqlScalarType::Int64.nullable(true))
441 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442 .with_column("hint", SqlScalarType::Float64.nullable(true))
443 .with_column("savings", SqlScalarType::String.nullable(true))
444 .finish();
445 Ok(StatementDesc::new(Some(relation_desc)))
446 }
447 }
448}
449
450pub fn describe_explain_analyze_cluster(
451 _scx: &StatementContext,
452 statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454 if statement.as_sql {
455 let relation_desc = RelationDesc::builder()
456 .with_column("SQL", SqlScalarType::String.nullable(false))
457 .finish();
458 return Ok(StatementDesc::new(Some(relation_desc)));
459 }
460
461 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463 let mut relation_desc = RelationDesc::builder()
464 .with_column("object", SqlScalarType::String.nullable(false))
465 .with_column("global_id", SqlScalarType::String.nullable(false));
466
467 if skew {
468 relation_desc =
469 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470 }
471
472 let mut seen_properties = BTreeSet::new();
473 for property in properties {
474 if !seen_properties.insert(property) {
476 continue;
477 }
478
479 match property {
480 ExplainAnalyzeComputationProperty::Memory if skew => {
481 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482 relation_desc = relation_desc
483 .with_column("max_operator_memory_ratio", numeric.clone())
484 .with_column("worker_memory", SqlScalarType::String.nullable(true))
485 .with_column("avg_memory", SqlScalarType::String.nullable(true))
486 .with_column("total_memory", SqlScalarType::String.nullable(true))
487 .with_column("max_operator_records_ratio", numeric.clone())
488 .with_column("worker_records", numeric.clone())
489 .with_column("avg_records", numeric.clone())
490 .with_column("total_records", numeric);
491 }
492 ExplainAnalyzeComputationProperty::Memory => {
493 relation_desc = relation_desc
494 .with_column("total_memory", SqlScalarType::String.nullable(true))
495 .with_column(
496 "total_records",
497 SqlScalarType::Numeric { max_scale: None }.nullable(true),
498 );
499 }
500 ExplainAnalyzeComputationProperty::Cpu if skew => {
501 relation_desc = relation_desc
502 .with_column(
503 "max_operator_cpu_ratio",
504 SqlScalarType::Numeric { max_scale: None }.nullable(true),
505 )
506 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509 }
510 ExplainAnalyzeComputationProperty::Cpu => {
511 relation_desc = relation_desc
512 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513 }
514 }
515 }
516
517 Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521 scx: &StatementContext,
522 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 let relation_desc = RelationDesc::builder()
525 .with_column("Timestamp", SqlScalarType::String.nullable(false))
526 .finish();
527
528 Ok(StatementDesc::new(Some(relation_desc))
529 .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533 _: &StatementContext,
534 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536 let relation_desc = RelationDesc::builder()
537 .with_column("Schema", SqlScalarType::String.nullable(false))
538 .finish();
539 Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542generate_extracted_config!(
551 ExplainPlanOption,
552 (Arity, Option<bool>, Default(None)),
553 (Cardinality, bool, Default(false)),
554 (ColumnNames, bool, Default(false)),
555 (FilterPushdown, Option<bool>, Default(None)),
556 (HumanizedExpressions, Option<bool>, Default(None)),
557 (JoinImplementations, bool, Default(false)),
558 (Keys, bool, Default(false)),
559 (LinearChains, bool, Default(false)),
560 (NoFastPath, bool, Default(false)),
561 (NonNegative, bool, Default(false)),
562 (NoNotices, bool, Default(false)),
563 (NodeIdentifiers, bool, Default(false)),
564 (Raw, bool, Default(false)),
565 (RawPlans, bool, Default(false)),
566 (RawSyntax, bool, Default(false)),
567 (Redacted, bool, Default(false)),
568 (SubtreeSize, bool, Default(false)),
569 (Timing, bool, Default(false)),
570 (Types, bool, Default(false)),
571 (Equivalences, bool, Default(false)),
572 (ReoptimizeImportedViews, Option<bool>, Default(None)),
573 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578 (
579 EnableProjectionPushdownAfterRelationCse,
580 Option<bool>,
581 Default(None)
582 )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586 type Error = PlanError;
587
588 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589 if v.raw {
592 v.raw_plans = true;
593 v.raw_syntax = true;
594 }
595
596 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600 Ok(ExplainConfig {
601 arity: v.arity.unwrap_or(enable_on_prod),
602 cardinality: v.cardinality,
603 column_names: v.column_names,
604 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606 join_impls: v.join_implementations,
607 keys: v.keys,
608 linear_chains: !v.raw_plans && v.linear_chains,
609 no_fast_path: v.no_fast_path,
610 no_notices: v.no_notices,
611 node_ids: v.node_identifiers,
612 non_negative: v.non_negative,
613 raw_plans: v.raw_plans,
614 raw_syntax: v.raw_syntax,
615 verbose_syntax: false,
616 redacted: v.redacted,
617 subtree_size: v.subtree_size,
618 equivalences: v.equivalences,
619 timing: v.timing,
620 types: v.types,
621 features: OptimizerFeatureOverrides {
623 enable_guard_subquery_tablefunc: Default::default(),
624 enable_eager_delta_joins: v.enable_eager_delta_joins,
625 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628 enable_consolidate_after_union_negate: Default::default(),
629 enable_reduce_mfp_fusion: Default::default(),
630 enable_cardinality_estimates: Default::default(),
631 persist_fast_path_limit: Default::default(),
632 reoptimize_imported_views: v.reoptimize_imported_views,
633 enable_reduce_reduction: Default::default(),
634 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
635 enable_projection_pushdown_after_relation_cse: v
636 .enable_projection_pushdown_after_relation_cse,
637 enable_less_reduce_in_eqprop: Default::default(),
638 enable_dequadratic_eqprop_map: Default::default(),
639 enable_eq_classes_withholding_errors: Default::default(),
640 enable_fast_path_plan_insights: Default::default(),
641 },
642 })
643 }
644}
645
646fn plan_explainee(
647 scx: &StatementContext,
648 explainee: Explainee<Aug>,
649 params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651 use crate::plan::ExplaineeStatement;
652
653 let is_replan = matches!(
654 explainee,
655 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656 );
657
658 let explainee = match explainee {
659 Explainee::View(name) | Explainee::ReplanView(name) => {
660 let item = scx.get_item_by_resolved_name(&name)?;
661 let item_type = item.item_type();
662 if item_type != CatalogItemType::View {
663 sql_bail!("Expected {name} to be a view, not a {item_type}");
664 }
665 match is_replan {
666 true => crate::plan::Explainee::ReplanView(item.id()),
667 false => crate::plan::Explainee::View(item.id()),
668 }
669 }
670 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671 let item = scx.get_item_by_resolved_name(&name)?;
672 let item_type = item.item_type();
673 if item_type != CatalogItemType::MaterializedView {
674 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675 }
676 match is_replan {
677 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678 false => crate::plan::Explainee::MaterializedView(item.id()),
679 }
680 }
681 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682 let item = scx.get_item_by_resolved_name(&name)?;
683 let item_type = item.item_type();
684 if item_type != CatalogItemType::Index {
685 sql_bail!("Expected {name} to be an index, not a {item_type}");
686 }
687 match is_replan {
688 true => crate::plan::Explainee::ReplanIndex(item.id()),
689 false => crate::plan::Explainee::Index(item.id()),
690 }
691 }
692 Explainee::Select(select, broken) => {
693 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695 }
696 Explainee::CreateView(mut stmt, broken) => {
697 if stmt.if_exists != IfExistsBehavior::Skip {
698 stmt.if_exists = IfExistsBehavior::Skip;
703 } else {
704 sql_bail!(
705 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706 (the behavior is implied within the scope of an enclosing EXPLAIN)"
707 );
708 }
709
710 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711 sql_bail!("expected CreateViewPlan plan");
712 };
713
714 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715 }
716 Explainee::CreateMaterializedView(mut stmt, broken) => {
717 if stmt.if_exists != IfExistsBehavior::Skip {
718 stmt.if_exists = IfExistsBehavior::Skip;
723 } else {
724 sql_bail!(
725 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726 (the behavior is implied within the scope of an enclosing EXPLAIN)"
727 );
728 }
729
730 let Plan::CreateMaterializedView(plan) =
731 ddl::plan_create_materialized_view(scx, *stmt)?
732 else {
733 sql_bail!("expected CreateMaterializedViewPlan plan");
734 };
735
736 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737 broken,
738 plan,
739 })
740 }
741 Explainee::CreateIndex(mut stmt, broken) => {
742 if !stmt.if_not_exists {
743 stmt.if_not_exists = true;
746 } else {
747 sql_bail!(
748 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749 (the behavior is implied within the scope of an enclosing EXPLAIN)"
750 );
751 }
752
753 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754 sql_bail!("expected CreateIndexPlan plan");
755 };
756
757 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758 }
759 };
760
761 Ok(explainee)
762}
763
764pub fn plan_explain_plan(
765 scx: &StatementContext,
766 explain: ExplainPlanStatement<Aug>,
767 params: &Params,
768) -> Result<Plan, PlanError> {
769 let (format, verbose_syntax) = match explain.format() {
770 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
771 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
772 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
773 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
774 };
775 let stage = explain.stage();
776
777 let mut config = {
779 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
780
781 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
782 with_options.filter_pushdown = Some(false);
784 }
785
786 ExplainConfig::try_from(with_options)?
787 };
788 config.verbose_syntax = verbose_syntax;
789
790 let explainee = plan_explainee(scx, explain.explainee, params)?;
791
792 Ok(Plan::ExplainPlan(ExplainPlanPlan {
793 stage,
794 format,
795 config,
796 explainee,
797 }))
798}
799
800pub fn plan_explain_schema(
801 scx: &StatementContext,
802 explain_schema: ExplainSinkSchemaStatement<Aug>,
803) -> Result<Plan, PlanError> {
804 let ExplainSinkSchemaStatement {
805 schema_for,
806 format: _,
808 mut statement,
809 } = explain_schema;
810
811 statement.name = Some(UnresolvedItemName::qualified(&[
815 ident!("mz_catalog"),
816 ident!("mz_explain_schema"),
817 ]));
818
819 crate::pure::purify_create_sink_avro_doc_on_options(
820 scx.catalog,
821 *statement.from.item_id(),
822 &mut statement.format,
823 )?;
824
825 match ddl::plan_create_sink(scx, statement)? {
826 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
827 StorageSinkConnection::Kafka(KafkaSinkConnection {
828 format:
829 KafkaSinkFormat {
830 key_format,
831 value_format:
832 KafkaSinkFormatType::Avro {
833 schema: value_schema,
834 ..
835 },
836 ..
837 },
838 ..
839 }) => {
840 let schema = match schema_for {
841 ExplainSinkSchemaFor::Key => key_format
842 .and_then(|f| match f {
843 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
844 _ => None,
845 })
846 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
847 ExplainSinkSchemaFor::Value => value_schema,
848 };
849
850 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
851 sink_from: sink.from,
852 json_schema: schema,
853 }))
854 }
855 _ => bail_unsupported!(
856 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
857 ),
858 },
859 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
860 }
861}
862
863pub fn plan_explain_pushdown(
864 scx: &StatementContext,
865 statement: ExplainPushdownStatement<Aug>,
866 params: &Params,
867) -> Result<Plan, PlanError> {
868 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
869 let explainee = plan_explainee(scx, statement.explainee, params)?;
870 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
871}
872
873pub fn plan_explain_analyze_object(
874 scx: &StatementContext,
875 statement: ExplainAnalyzeObjectStatement<Aug>,
876 params: &Params,
877) -> Result<Plan, PlanError> {
878 let explainee_name = statement
879 .explainee
880 .name()
881 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
882 .full_name_str();
883 let explainee = plan_explainee(scx, statement.explainee, params)?;
884
885 match explainee {
886 plan::Explainee::Index(_index_id) => (),
887 plan::Explainee::MaterializedView(_item_id) => (),
888 _ => {
889 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
890 }
891 };
892
893 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
908 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
909 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
910 let mut order_by = vec!["mlm.lir_id DESC"];
911
912 match statement.properties {
913 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
914 properties,
915 skew,
916 }) => {
917 let mut worker_id = None;
918 let mut seen_properties = BTreeSet::new();
919 for property in properties {
920 if !seen_properties.insert(property) {
922 continue;
923 }
924
925 match property {
926 ExplainAnalyzeComputationProperty::Memory => {
927 ctes.push((
928 "summary_memory",
929 r#"
930 SELECT mlm.global_id AS global_id,
931 mlm.lir_id AS lir_id,
932 SUM(mas.size) AS total_memory,
933 SUM(mas.records) AS total_records,
934 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
935 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
936 FROM mz_introspection.mz_lir_mapping mlm
937 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
938 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
939 ON (mas.operator_id = valid_id)
940GROUP BY mlm.global_id, mlm.lir_id"#,
941 ));
942 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
943
944 if skew {
945 ctes.push((
946 "per_worker_memory",
947 r#"
948 SELECT mlm.global_id AS global_id,
949 mlm.lir_id AS lir_id,
950 mas.worker_id AS worker_id,
951 SUM(mas.size) AS worker_memory,
952 SUM(mas.records) AS worker_records
953 FROM mz_introspection.mz_lir_mapping mlm
954 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
955 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
956 ON (mas.operator_id = valid_id)
957GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
958 ));
959 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
960
961 if let Some(worker_id) = worker_id {
962 predicates.push(format!("pwm.worker_id = {worker_id}"));
963 } else {
964 worker_id = Some("pwm.worker_id");
965 columns.push("pwm.worker_id AS worker_id");
966 order_by.push("worker_id");
967 }
968
969 columns.extend([
970 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
971 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
972 "pg_size_pretty(sm.avg_memory) AS avg_memory",
973 "pg_size_pretty(sm.total_memory) AS total_memory",
974 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
975 "pwm.worker_records AS worker_records",
976 "sm.avg_records AS avg_records",
977 "sm.total_records AS total_records",
978 ]);
979 } else {
980 columns.extend([
981 "pg_size_pretty(sm.total_memory) AS total_memory",
982 "sm.total_records AS total_records",
983 ]);
984 }
985 }
986 ExplainAnalyzeComputationProperty::Cpu => {
987 ctes.push((
988 "summary_cpu",
989 r#"
990 SELECT mlm.global_id AS global_id,
991 mlm.lir_id AS lir_id,
992 SUM(mse.elapsed_ns) AS total_ns,
993 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
994 FROM mz_introspection.mz_lir_mapping mlm
995 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
996 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
997 ON (mse.id = valid_id)
998GROUP BY mlm.global_id, mlm.lir_id"#,
999 ));
1000 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1001
1002 if skew {
1003 ctes.push((
1004 "per_worker_cpu",
1005 r#"
1006 SELECT mlm.global_id AS global_id,
1007 mlm.lir_id AS lir_id,
1008 mse.worker_id AS worker_id,
1009 SUM(mse.elapsed_ns) AS worker_ns
1010 FROM mz_introspection.mz_lir_mapping mlm
1011 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1012 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1013 ON (mse.id = valid_id)
1014GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1015 ));
1016 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1017
1018 if let Some(worker_id) = worker_id {
1019 predicates.push(format!("pwc.worker_id = {worker_id}"));
1020 } else {
1021 worker_id = Some("pwc.worker_id");
1022 columns.push("pwc.worker_id AS worker_id");
1023 order_by.push("worker_id");
1024 }
1025
1026 columns.extend([
1027 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1028 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1029 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1030 ]);
1031 }
1032 columns.push(
1033 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1034 );
1035 }
1036 }
1037 }
1038 }
1039 ExplainAnalyzeProperty::Hints => {
1040 columns.extend([
1041 "megsa.levels AS levels",
1042 "megsa.to_cut AS to_cut",
1043 "megsa.hint AS hint",
1044 "pg_size_pretty(megsa.savings) AS savings",
1045 ]);
1046 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1047 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1048 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1049 }
1050 }
1051
1052 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1053
1054 let ctes = if !ctes.is_empty() {
1055 format!(
1056 "WITH {}",
1057 separated(
1058 ",\n",
1059 ctes.iter()
1060 .map(|(name, defn)| format!("{name} AS ({defn})"))
1061 )
1062 )
1063 } else {
1064 String::new()
1065 };
1066 let columns = separated(", ", columns);
1067 let from = separated(" ", from);
1068 let predicates = separated(" AND ", predicates);
1069 let order_by = separated(", ", order_by);
1070 let query = format!(
1071 r#"{ctes}
1072SELECT {columns}
1073FROM {from}
1074WHERE {predicates}
1075ORDER BY {order_by}"#
1076 );
1077
1078 if statement.as_sql {
1079 let rows = vec![Row::pack_slice(&[Datum::String(
1080 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1081 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1082 })?,
1083 )])];
1084 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1085
1086 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1087 } else {
1088 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1089 show_select.plan()
1090 }
1091}
1092
1093pub fn plan_explain_analyze_cluster(
1094 scx: &StatementContext,
1095 statement: ExplainAnalyzeClusterStatement,
1096 _params: &Params,
1097) -> Result<Plan, PlanError> {
1098 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1123 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1124 let mut predicates = vec![];
1125 let mut order_by = vec![];
1126
1127 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1128 let mut worker_id = None;
1129 let mut seen_properties = BTreeSet::new();
1130 for property in properties {
1131 if !seen_properties.insert(property) {
1133 continue;
1134 }
1135
1136 match property {
1137 ExplainAnalyzeComputationProperty::Memory => {
1138 if skew {
1139 let mut set_worker_id = false;
1140 if let Some(worker_id) = worker_id {
1141 predicates.push(format!("om.worker_id = {worker_id}"));
1143 } else {
1144 worker_id = Some("om.worker_id");
1145 columns.push("om.worker_id AS worker_id");
1146 set_worker_id = true; };
1148
1149 ctes.push((
1151 "per_operator_memory_summary",
1152 r#"
1153SELECT mlm.global_id AS global_id,
1154 mlm.lir_id AS lir_id,
1155 SUM(mas.size) AS total_memory,
1156 SUM(mas.records) AS total_records,
1157 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1158 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1159FROM mz_introspection.mz_lir_mapping mlm
1160 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1161 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1162 ON (mas.operator_id = valid_id)
1163GROUP BY mlm.global_id, mlm.lir_id"#,
1164 ));
1165
1166 ctes.push((
1168 "per_operator_memory_per_worker",
1169 r#"
1170SELECT mlm.global_id AS global_id,
1171 mlm.lir_id AS lir_id,
1172 mas.worker_id AS worker_id,
1173 SUM(mas.size) AS worker_memory,
1174 SUM(mas.records) AS worker_records
1175FROM mz_introspection.mz_lir_mapping mlm
1176 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1177 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1178 ON (mas.operator_id = valid_id)
1179GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1180 ));
1181
1182 ctes.push((
1184 "per_operator_memory_ratios",
1185 r#"
1186SELECT pompw.global_id AS global_id,
1187 pompw.lir_id AS lir_id,
1188 pompw.worker_id AS worker_id,
1189 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1190 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1191 FROM per_operator_memory_per_worker pompw
1192 JOIN per_operator_memory_summary poms
1193 USING (global_id, lir_id)
1194"#,
1195 ));
1196
1197 ctes.push((
1199 "object_memory",
1200 r#"
1201SELECT pompw.global_id AS global_id,
1202 pompw.worker_id AS worker_id,
1203 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1204 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1205 SUM(pompw.worker_memory) AS worker_memory,
1206 SUM(pompw.worker_records) AS worker_records
1207FROM per_operator_memory_per_worker pompw
1208 JOIN per_operator_memory_ratios pomr
1209 USING (global_id, worker_id, lir_id)
1210GROUP BY pompw.global_id, pompw.worker_id
1211"#,
1212 ));
1213
1214 ctes.push(("object_average_memory", r#"
1216SELECT om.global_id AS global_id,
1217 SUM(om.worker_memory) AS total_memory,
1218 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1219 SUM(om.worker_records) AS total_records,
1220 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1221 FROM object_memory om
1222GROUP BY om.global_id"#));
1223
1224 from.push("LEFT JOIN object_memory om USING (global_id)");
1225 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1226
1227 columns.extend([
1228 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1229 "pg_size_pretty(om.worker_memory) AS worker_memory",
1230 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1231 "pg_size_pretty(oam.total_memory) AS total_memory",
1232 "om.max_operator_records_ratio AS max_operator_records_ratio",
1233 "om.worker_records AS worker_records",
1234 "oam.avg_records AS avg_records",
1235 "oam.total_records AS total_records",
1236 ]);
1237
1238 order_by.extend([
1239 "max_operator_memory_ratio DESC",
1240 "max_operator_records_ratio DESC",
1241 "worker_memory DESC",
1242 "worker_records DESC",
1243 ]);
1244
1245 if set_worker_id {
1246 order_by.push("worker_id");
1247 }
1248 } else {
1249 ctes.push((
1251 "per_operator_memory_totals",
1252 r#"
1253 SELECT mlm.global_id AS global_id,
1254 mlm.lir_id AS lir_id,
1255 SUM(mas.size) AS total_memory,
1256 SUM(mas.records) AS total_records
1257 FROM mz_introspection.mz_lir_mapping mlm
1258 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1259 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1260 ON (mas.operator_id = valid_id)
1261 GROUP BY mlm.global_id, mlm.lir_id"#,
1262 ));
1263
1264 ctes.push((
1265 "object_memory_totals",
1266 r#"
1267SELECT pomt.global_id AS global_id,
1268 SUM(pomt.total_memory) AS total_memory,
1269 SUM(pomt.total_records) AS total_records
1270FROM per_operator_memory_totals pomt
1271GROUP BY pomt.global_id
1272"#,
1273 ));
1274
1275 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1276 columns.extend([
1277 "pg_size_pretty(omt.total_memory) AS total_memory",
1278 "omt.total_records AS total_records",
1279 ]);
1280 order_by.extend(["total_memory DESC", "total_records DESC"]);
1281 }
1282 }
1283 ExplainAnalyzeComputationProperty::Cpu => {
1284 if skew {
1285 let mut set_worker_id = false;
1286 if let Some(worker_id) = worker_id {
1287 predicates.push(format!("oc.worker_id = {worker_id}"));
1289 } else {
1290 worker_id = Some("oc.worker_id");
1291 columns.push("oc.worker_id AS worker_id");
1292 set_worker_id = true; };
1294
1295 ctes.push((
1297 "per_operator_cpu_summary",
1298 r#"
1299SELECT mlm.global_id AS global_id,
1300 mlm.lir_id AS lir_id,
1301 SUM(mse.elapsed_ns) AS total_ns,
1302 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1303FROM mz_introspection.mz_lir_mapping mlm
1304CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1305 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1306 ON (mse.id = valid_id)
1307GROUP BY mlm.global_id, mlm.lir_id"#,
1308));
1309
1310 ctes.push((
1312 "per_operator_cpu_per_worker",
1313 r#"
1314SELECT mlm.global_id AS global_id,
1315 mlm.lir_id AS lir_id,
1316 mse.worker_id AS worker_id,
1317 SUM(mse.elapsed_ns) AS worker_ns
1318FROM mz_introspection.mz_lir_mapping mlm
1319CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1320 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1321 ON (mse.id = valid_id)
1322GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1323 ));
1324
1325 ctes.push((
1327 "per_operator_cpu_ratios",
1328 r#"
1329SELECT pocpw.global_id AS global_id,
1330 pocpw.lir_id AS lir_id,
1331 pocpw.worker_id AS worker_id,
1332 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1333FROM per_operator_cpu_per_worker pocpw
1334 JOIN per_operator_cpu_summary pocs
1335 USING (global_id, lir_id)
1336"#,
1337 ));
1338
1339 ctes.push((
1341 "object_cpu",
1342 r#"
1343SELECT pocpw.global_id AS global_id,
1344 pocpw.worker_id AS worker_id,
1345 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1346 SUM(pocpw.worker_ns) AS worker_ns
1347FROM per_operator_cpu_per_worker pocpw
1348 JOIN per_operator_cpu_ratios pomr
1349 USING (global_id, worker_id, lir_id)
1350GROUP BY pocpw.global_id, pocpw.worker_id
1351"#,
1352 ));
1353
1354 ctes.push((
1356 "object_average_cpu",
1357 r#"
1358SELECT oc.global_id AS global_id,
1359 SUM(oc.worker_ns) AS total_ns,
1360 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1361 FROM object_cpu oc
1362GROUP BY oc.global_id"#,));
1363
1364 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1365 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1366
1367 columns.extend([
1368 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1369 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1370 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1371 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1372 ]);
1373
1374 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1375
1376 if set_worker_id {
1377 order_by.push("worker_id");
1378 }
1379 } else {
1380 ctes.push((
1382 "per_operator_cpu_totals",
1383 r#"
1384 SELECT mlm.global_id AS global_id,
1385 mlm.lir_id AS lir_id,
1386 SUM(mse.elapsed_ns) AS total_ns
1387 FROM mz_introspection.mz_lir_mapping mlm
1388 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1389 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1390 ON (mse.id = valid_id)
1391 GROUP BY mlm.global_id, mlm.lir_id"#,
1392 ));
1393
1394 ctes.push((
1395 "object_cpu_totals",
1396 r#"
1397SELECT poct.global_id AS global_id,
1398 SUM(poct.total_ns) AS total_ns
1399FROM per_operator_cpu_totals poct
1400GROUP BY poct.global_id
1401"#,
1402 ));
1403
1404 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1405 columns
1406 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1407 order_by.extend(["total_elapsed DESC"]);
1408 }
1409 }
1410 }
1411 }
1412
1413 let ctes = if !ctes.is_empty() {
1415 format!(
1416 "WITH {}",
1417 separated(
1418 ",\n",
1419 ctes.iter()
1420 .map(|(name, defn)| format!("{name} AS ({defn})"))
1421 )
1422 )
1423 } else {
1424 String::new()
1425 };
1426 let columns = separated(", ", columns);
1427 let from = separated(" ", from);
1428 let predicates = if !predicates.is_empty() {
1429 format!("WHERE {}", separated(" AND ", predicates))
1430 } else {
1431 String::new()
1432 };
1433 order_by.push("mo.name DESC");
1435 let order_by = separated(", ", order_by);
1436 let query = format!(
1437 r#"{ctes}
1438SELECT {columns}
1439FROM {from}
1440{predicates}
1441ORDER BY {order_by}"#
1442 );
1443
1444 if statement.as_sql {
1445 let rows = vec![Row::pack_slice(&[Datum::String(
1446 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1447 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1448 })?,
1449 )])];
1450 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1451
1452 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1453 } else {
1454 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1455 show_select.plan()
1456 }
1457}
1458
1459pub fn plan_explain_timestamp(
1460 scx: &StatementContext,
1461 explain: ExplainTimestampStatement<Aug>,
1462) -> Result<Plan, PlanError> {
1463 let (format, _verbose_syntax) = match explain.format() {
1464 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1465 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1466 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1467 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1468 };
1469
1470 let raw_plan = {
1471 let query::PlannedRootQuery {
1472 expr: raw_plan,
1473 desc: _,
1474 finishing: _,
1475 scope: _,
1476 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1477 if raw_plan.contains_parameters()? {
1478 return Err(PlanError::ParameterNotAllowed(
1479 "EXPLAIN TIMESTAMP".to_string(),
1480 ));
1481 }
1482
1483 raw_plan
1484 };
1485 let when = query::plan_as_of(scx, explain.select.as_of)?;
1486
1487 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1488 format,
1489 raw_plan,
1490 when,
1491 }))
1492}
1493
1494#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1497pub fn plan_query(
1498 scx: &StatementContext,
1499 query: Query<Aug>,
1500 params: &Params,
1501 lifetime: QueryLifetime,
1502) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1503 let query::PlannedRootQuery {
1504 mut expr,
1505 desc,
1506 finishing,
1507 scope,
1508 } = query::plan_root_query(scx, query, lifetime)?;
1509 expr.bind_parameters(scx, lifetime, params)?;
1510
1511 Ok(query::PlannedRootQuery {
1512 expr: expr.lower(scx.catalog.system_vars(), None)?,
1514 desc,
1515 finishing,
1516 scope,
1517 })
1518}
1519
1520generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1521
1522pub fn describe_subscribe(
1523 scx: &StatementContext,
1524 stmt: SubscribeStatement<Aug>,
1525) -> Result<StatementDesc, PlanError> {
1526 let relation_desc = match stmt.relation {
1527 SubscribeRelation::Name(name) => {
1528 let item = scx.get_item_by_resolved_name(&name)?;
1529 item.desc(&scx.catalog.resolve_full_name(item.name()))?
1530 .into_owned()
1531 }
1532 SubscribeRelation::Query(query) => {
1533 let query::PlannedRootQuery { desc, .. } =
1534 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1535 desc
1536 }
1537 };
1538 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1539 let progress = progress.unwrap_or(false);
1540 let mut desc = RelationDesc::builder().with_column(
1541 "mz_timestamp",
1542 SqlScalarType::Numeric {
1543 max_scale: Some(NumericMaxScale::ZERO),
1544 }
1545 .nullable(false),
1546 );
1547 if progress {
1548 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1549 }
1550
1551 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1552 match stmt.output {
1553 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1554 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1555 for (name, mut ty) in relation_desc.into_iter() {
1556 if progress {
1557 ty.nullable = true;
1558 }
1559 desc = desc.with_column(name, ty);
1560 }
1561 }
1562 SubscribeOutput::EnvelopeUpsert { key_columns }
1563 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1564 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1565 let key_columns = key_columns
1566 .into_iter()
1567 .map(normalize::column_name)
1568 .collect_vec();
1569 let mut before_values_desc = RelationDesc::builder();
1570 let mut after_values_desc = RelationDesc::builder();
1571
1572 for column_name in &key_columns {
1574 let mut column_ty = relation_desc
1575 .get_by_name(column_name)
1576 .map(|(_pos, ty)| ty.clone())
1577 .ok_or_else(|| PlanError::UnknownColumn {
1578 table: None,
1579 column: column_name.clone(),
1580 similar: Box::new([]),
1581 })?;
1582 if progress {
1583 column_ty.nullable = true;
1584 }
1585 desc = desc.with_column(column_name, column_ty);
1586 }
1587
1588 for (mut name, mut ty) in relation_desc
1591 .into_iter()
1592 .filter(|(name, _ty)| !key_columns.contains(name))
1593 {
1594 ty.nullable = true;
1595 before_values_desc =
1596 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1597 if debezium {
1598 name = format!("after_{}", name).into();
1599 }
1600 after_values_desc = after_values_desc.with_column(name, ty);
1601 }
1602
1603 if debezium {
1604 desc = desc.concat(before_values_desc);
1605 }
1606 desc = desc.concat(after_values_desc);
1607 }
1608 }
1609 Ok(StatementDesc::new(Some(desc.finish())))
1610}
1611
1612pub fn plan_subscribe(
1613 scx: &StatementContext,
1614 SubscribeStatement {
1615 relation,
1616 options,
1617 as_of,
1618 up_to,
1619 output,
1620 }: SubscribeStatement<Aug>,
1621 params: &Params,
1622 copy_to: Option<CopyFormat>,
1623) -> Result<Plan, PlanError> {
1624 let (from, desc, scope) = match relation {
1625 SubscribeRelation::Name(name) => {
1626 let entry = scx.get_item_by_resolved_name(&name)?;
1627 let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1628 Ok(desc) => desc,
1629 Err(..) => sql_bail!(
1630 "'{}' cannot be subscribed to because it is a {}",
1631 name.full_name_str(),
1632 entry.item_type(),
1633 ),
1634 };
1635 let item_name = match name {
1636 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1637 _ => None,
1638 };
1639 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1640 (
1641 SubscribeFrom::Id(entry.global_id()),
1642 desc.into_owned(),
1643 scope,
1644 )
1645 }
1646 SubscribeRelation::Query(query) => {
1647 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1649 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1653 &query.finishing,
1654 query.desc.arity()
1655 ));
1656 let desc = query.desc.clone();
1657 (
1658 SubscribeFrom::Query {
1659 expr: query.expr,
1660 desc: query.desc,
1661 },
1662 desc,
1663 query.scope,
1664 )
1665 }
1666 };
1667
1668 let when = query::plan_as_of(scx, as_of)?;
1669 let up_to = up_to
1670 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1671 .transpose()?;
1672
1673 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1674 let ecx = ExprContext {
1675 qcx: &qcx,
1676 name: "",
1677 scope: &scope,
1678 relation_type: desc.typ(),
1679 allow_aggregates: false,
1680 allow_subqueries: true,
1681 allow_parameters: true,
1682 allow_windows: false,
1683 };
1684
1685 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1686 let output = match output {
1687 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1688 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1689 let order_by = key_columns
1690 .iter()
1691 .map(|ident| OrderByExpr {
1692 expr: Expr::Identifier(vec![ident.clone()]),
1693 asc: None,
1694 nulls_last: None,
1695 })
1696 .collect_vec();
1697 let (order_by, map_exprs) = query::plan_order_by_exprs(
1698 &ExprContext {
1699 name: "ENVELOPE UPSERT KEY clause",
1700 ..ecx
1701 },
1702 &order_by[..],
1703 &output_columns[..],
1704 )?;
1705 if !map_exprs.is_empty() {
1706 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1707 }
1708 plan::SubscribeOutput::EnvelopeUpsert {
1709 order_by_keys: order_by,
1710 }
1711 }
1712 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1713 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1714 let order_by = key_columns
1715 .iter()
1716 .map(|ident| OrderByExpr {
1717 expr: Expr::Identifier(vec![ident.clone()]),
1718 asc: None,
1719 nulls_last: None,
1720 })
1721 .collect_vec();
1722 let (order_by, map_exprs) = query::plan_order_by_exprs(
1723 &ExprContext {
1724 name: "ENVELOPE DEBEZIUM KEY clause",
1725 ..ecx
1726 },
1727 &order_by[..],
1728 &output_columns[..],
1729 )?;
1730 if !map_exprs.is_empty() {
1731 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1732 }
1733 plan::SubscribeOutput::EnvelopeDebezium {
1734 order_by_keys: order_by,
1735 }
1736 }
1737 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1738 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1739 let mz_diff = "mz_diff".into();
1740 let output_columns = std::iter::once((0, &mz_diff))
1741 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1742 .collect_vec();
1743 match query::plan_order_by_exprs(
1744 &ExprContext {
1745 name: "WITHIN TIMESTAMP ORDER BY clause",
1746 ..ecx
1747 },
1748 &order_by[..],
1749 &output_columns[..],
1750 ) {
1751 Err(PlanError::UnknownColumn {
1752 table: None,
1753 column,
1754 similar: _,
1755 }) if &column == &mz_diff => {
1756 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1759 }
1760 Err(e) => return Err(e),
1761 Ok((order_by, map_exprs)) => {
1762 if !map_exprs.is_empty() {
1763 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1764 }
1765
1766 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1767 }
1768 }
1769 }
1770 };
1771
1772 let SubscribeOptionExtracted {
1773 progress, snapshot, ..
1774 } = options.try_into()?;
1775 Ok(Plan::Subscribe(SubscribePlan {
1776 from,
1777 when,
1778 up_to,
1779 with_snapshot: snapshot.unwrap_or(true),
1780 copy_to,
1781 emit_progress: progress.unwrap_or(false),
1782 output,
1783 }))
1784}
1785
1786pub fn describe_copy_from_table(
1787 scx: &StatementContext,
1788 table_name: <Aug as AstInfo>::ItemName,
1789 columns: Vec<Ident>,
1790) -> Result<StatementDesc, PlanError> {
1791 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1792 Ok(StatementDesc::new(Some(desc)))
1793}
1794
1795pub fn describe_copy_item(
1796 scx: &StatementContext,
1797 object_name: <Aug as AstInfo>::ItemName,
1798 columns: Vec<Ident>,
1799) -> Result<StatementDesc, PlanError> {
1800 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1801 Ok(StatementDesc::new(Some(desc)))
1802}
1803
1804pub fn describe_copy(
1805 scx: &StatementContext,
1806 CopyStatement {
1807 relation,
1808 direction,
1809 ..
1810 }: CopyStatement<Aug>,
1811) -> Result<StatementDesc, PlanError> {
1812 Ok(match (relation, direction) {
1813 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1814 describe_copy_item(scx, name, columns)?
1815 }
1816 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1817 describe_copy_from_table(scx, name, columns)?
1818 }
1819 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1820 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1821 }
1822 .with_is_copy())
1823}
1824
1825fn plan_copy_to_expr(
1826 scx: &StatementContext,
1827 select_plan: SelectPlan,
1828 desc: RelationDesc,
1829 to: &Expr<Aug>,
1830 format: CopyFormat,
1831 options: CopyOptionExtracted,
1832) -> Result<Plan, PlanError> {
1833 let conn_id = match options.aws_connection {
1834 Some(conn_id) => CatalogItemId::from(conn_id),
1835 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1836 };
1837 let connection = scx.get_item(&conn_id).connection()?;
1838
1839 match connection {
1840 mz_storage_types::connections::Connection::Aws(_) => {}
1841 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1842 }
1843
1844 let format = match format {
1845 CopyFormat::Csv => {
1846 let quote = extract_byte_param_value(options.quote, "quote")?;
1847 let escape = extract_byte_param_value(options.escape, "escape")?;
1848 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1849 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1850 CopyCsvFormatParams::try_new(
1851 delimiter,
1852 quote,
1853 escape,
1854 options.header,
1855 options.null,
1856 )
1857 .map_err(|e| sql_err!("{}", e))?,
1858 ))
1859 }
1860 CopyFormat::Parquet => {
1861 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1863 S3SinkFormat::Parquet
1864 }
1865 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1866 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1867 };
1868
1869 let mut to_expr = to.clone();
1871 transform_ast::transform(scx, &mut to_expr)?;
1872 let relation_type = RelationDesc::empty();
1873 let ecx = &ExprContext {
1874 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1875 name: "COPY TO target",
1876 scope: &Scope::empty(),
1877 relation_type: relation_type.typ(),
1878 allow_aggregates: false,
1879 allow_subqueries: false,
1880 allow_parameters: false,
1881 allow_windows: false,
1882 };
1883
1884 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1885
1886 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1887 sql_bail!(
1888 "MAX FILE SIZE cannot be less than {}",
1889 MIN_S3_SINK_FILE_SIZE
1890 );
1891 }
1892 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1893 sql_bail!(
1894 "MAX FILE SIZE cannot be greater than {}",
1895 MAX_S3_SINK_FILE_SIZE
1896 );
1897 }
1898
1899 Ok(Plan::CopyTo(CopyToPlan {
1900 select_plan,
1901 desc,
1902 to,
1903 connection: connection.to_owned(),
1904 connection_id: conn_id,
1905 format,
1906 max_file_size: options.max_file_size.as_bytes(),
1907 }))
1908}
1909
1910fn plan_copy_from(
1911 scx: &StatementContext,
1912 target: &CopyTarget<Aug>,
1913 table_name: ResolvedItemName,
1914 columns: Vec<Ident>,
1915 format: CopyFormat,
1916 options: CopyOptionExtracted,
1917) -> Result<Plan, PlanError> {
1918 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1919 match option {
1920 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1921 None => Ok(()),
1922 }
1923 }
1924
1925 let source = match target {
1926 CopyTarget::Stdin => CopyFromSource::Stdin,
1927 CopyTarget::Expr(from) => {
1928 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1929
1930 let mut from_expr = from.clone();
1932 transform_ast::transform(scx, &mut from_expr)?;
1933 let relation_type = RelationDesc::empty();
1934 let ecx = &ExprContext {
1935 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1936 name: "COPY FROM target",
1937 scope: &Scope::empty(),
1938 relation_type: relation_type.typ(),
1939 allow_aggregates: false,
1940 allow_subqueries: false,
1941 allow_parameters: false,
1942 allow_windows: false,
1943 };
1944 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1945
1946 match options.aws_connection {
1947 Some(conn_id) => {
1948 let conn_id = CatalogItemId::from(conn_id);
1949
1950 let connection = match scx.get_item(&conn_id).connection()? {
1952 mz_storage_types::connections::Connection::Aws(conn) => conn,
1953 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1954 };
1955
1956 CopyFromSource::AwsS3 {
1957 uri: from,
1958 connection,
1959 connection_id: conn_id,
1960 }
1961 }
1962 None => CopyFromSource::Url(from),
1963 }
1964 }
1965 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1966 };
1967
1968 let params = match format {
1969 CopyFormat::Text => {
1970 only_available_with_csv(options.quote, "quote")?;
1971 only_available_with_csv(options.escape, "escape")?;
1972 only_available_with_csv(options.header, "HEADER")?;
1973 let delimiter =
1974 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1975 let null = match options.null {
1976 Some(null) => Cow::from(null),
1977 None => Cow::from("\\N"),
1978 };
1979 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1980 }
1981 CopyFormat::Csv => {
1982 let quote = extract_byte_param_value(options.quote, "quote")?;
1983 let escape = extract_byte_param_value(options.escape, "escape")?;
1984 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1985 CopyFormatParams::Csv(
1986 CopyCsvFormatParams::try_new(
1987 delimiter,
1988 quote,
1989 escape,
1990 options.header,
1991 options.null,
1992 )
1993 .map_err(|e| sql_err!("{}", e))?,
1994 )
1995 }
1996 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1997 CopyFormat::Parquet => CopyFormatParams::Parquet,
1998 };
1999
2000 let filter = match (options.files, options.pattern) {
2001 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2002 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2003 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2004 (None, None) => None,
2005 };
2006
2007 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2008 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2009 }
2010
2011 let table_name_string = table_name.full_name_str();
2012
2013 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2014
2015 let Some(mfp) = maybe_mfp else {
2016 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2017 };
2018
2019 Ok(Plan::CopyFrom(CopyFromPlan {
2020 target_id: id,
2021 target_name: table_name_string,
2022 source,
2023 columns,
2024 source_desc,
2025 mfp,
2026 params,
2027 filter,
2028 }))
2029}
2030
2031fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2032 match v {
2033 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2034 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2035 None => Ok(None),
2036 }
2037}
2038
2039generate_extracted_config!(
2040 CopyOption,
2041 (Format, String),
2042 (Delimiter, String),
2043 (Null, String),
2044 (Escape, String),
2045 (Quote, String),
2046 (Header, bool),
2047 (AwsConnection, with_options::Object),
2048 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2049 (Files, Vec<String>),
2050 (Pattern, String)
2051);
2052
2053pub fn plan_copy(
2054 scx: &StatementContext,
2055 CopyStatement {
2056 relation,
2057 direction,
2058 target,
2059 options,
2060 }: CopyStatement<Aug>,
2061) -> Result<Plan, PlanError> {
2062 let options = CopyOptionExtracted::try_from(options)?;
2063 let format = options
2066 .format
2067 .as_ref()
2068 .map(|format| match format.to_lowercase().as_str() {
2069 "text" => Ok(CopyFormat::Text),
2070 "csv" => Ok(CopyFormat::Csv),
2071 "binary" => Ok(CopyFormat::Binary),
2072 "parquet" => Ok(CopyFormat::Parquet),
2073 _ => sql_bail!("unknown FORMAT: {}", format),
2074 })
2075 .transpose()?;
2076
2077 match (&direction, &target) {
2078 (CopyDirection::To, CopyTarget::Stdout) => {
2079 if options.delimiter.is_some() {
2080 sql_bail!("COPY TO does not support DELIMITER option yet");
2081 }
2082 if options.quote.is_some() {
2083 sql_bail!("COPY TO does not support QUOTE option yet");
2084 }
2085 if options.null.is_some() {
2086 sql_bail!("COPY TO does not support NULL option yet");
2087 }
2088 match relation {
2089 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2090 CopyRelation::Select(stmt) => Ok(plan_select(
2091 scx,
2092 stmt,
2093 &Params::empty(),
2094 Some(format.unwrap_or(CopyFormat::Text)),
2095 )?),
2096 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2097 scx,
2098 stmt,
2099 &Params::empty(),
2100 Some(format.unwrap_or(CopyFormat::Text)),
2101 )?),
2102 }
2103 }
2104 (CopyDirection::From, target) => match relation {
2105 CopyRelation::Named { name, columns } => plan_copy_from(
2106 scx,
2107 target,
2108 name,
2109 columns,
2110 format.unwrap_or(CopyFormat::Text),
2111 options,
2112 ),
2113 _ => sql_bail!("COPY FROM {} not supported", target),
2114 },
2115 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2116 if !scx.catalog.active_role_id().is_system() {
2121 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2122 }
2123
2124 let format = match format {
2125 Some(inner) => inner,
2126 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2127 };
2128
2129 let stmt = match relation {
2130 CopyRelation::Named { name, columns } => {
2131 if !columns.is_empty() {
2132 sql_bail!(
2134 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2135 );
2136 }
2137 let query = Query {
2139 ctes: CteBlock::empty(),
2140 body: SetExpr::Table(name),
2141 order_by: vec![],
2142 limit: None,
2143 offset: None,
2144 };
2145 SelectStatement { query, as_of: None }
2146 }
2147 CopyRelation::Select(stmt) => {
2148 if !stmt.query.order_by.is_empty() {
2149 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2150 }
2151 stmt
2152 }
2153 _ => sql_bail!("COPY {} {} not supported", direction, target),
2154 };
2155
2156 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2157 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2158 }
2159 _ => sql_bail!("COPY {} {} not supported", direction, target),
2160 }
2161}