1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
34 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
35 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
36 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
37 SetExpr, SubscribeOutput, UnresolvedItemName,
38};
39use mz_sql_parser::ident;
40use mz_storage_types::sinks::{
41 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
42 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
43};
44
45use crate::ast::display::AstDisplay;
46use crate::ast::{
47 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
48 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
49 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
50 UpdateStatement,
51};
52use crate::catalog::CatalogItemType;
53use crate::names::{Aug, ResolvedItemName};
54use crate::normalize;
55use crate::plan::query::{
56 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
57};
58use crate::plan::scope::Scope;
59use crate::plan::statement::show::ShowSelect;
60use crate::plan::statement::{StatementContext, StatementDesc, ddl};
61use crate::plan::{
62 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
63 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
64};
65use crate::plan::{
66 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
67 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
68};
69use crate::plan::{CopyFromSource, with_options};
70use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
71
72pub fn describe_insert(
79 scx: &StatementContext,
80 InsertStatement {
81 table_name,
82 columns,
83 source,
84 returning,
85 }: InsertStatement<Aug>,
86) -> Result<StatementDesc, PlanError> {
87 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
88 let desc = if returning.expr.is_empty() {
89 None
90 } else {
91 Some(returning.desc)
92 };
93 Ok(StatementDesc::new(desc))
94}
95
96pub fn plan_insert(
97 scx: &StatementContext,
98 InsertStatement {
99 table_name,
100 columns,
101 source,
102 returning,
103 }: InsertStatement<Aug>,
104 params: &Params,
105) -> Result<Plan, PlanError> {
106 let (id, mut expr, returning) =
107 query::plan_insert_query(scx, table_name, columns, source, returning)?;
108 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
109 let returning = returning
110 .expr
111 .into_iter()
112 .map(|mut expr| {
113 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
114 expr.lower_uncorrelated()
115 })
116 .collect::<Result<Vec<_>, _>>()?;
117
118 Ok(Plan::Insert(InsertPlan {
119 id,
120 values: expr,
121 returning,
122 }))
123}
124
125pub fn describe_delete(
126 scx: &StatementContext,
127 stmt: DeleteStatement<Aug>,
128) -> Result<StatementDesc, PlanError> {
129 query::plan_delete_query(scx, stmt)?;
130 Ok(StatementDesc::new(None))
131}
132
133pub fn plan_delete(
134 scx: &StatementContext,
135 stmt: DeleteStatement<Aug>,
136 params: &Params,
137) -> Result<Plan, PlanError> {
138 let rtw_plan = query::plan_delete_query(scx, stmt)?;
139 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
140}
141
142pub fn describe_update(
143 scx: &StatementContext,
144 stmt: UpdateStatement<Aug>,
145) -> Result<StatementDesc, PlanError> {
146 query::plan_update_query(scx, stmt)?;
147 Ok(StatementDesc::new(None))
148}
149
150pub fn plan_update(
151 scx: &StatementContext,
152 stmt: UpdateStatement<Aug>,
153 params: &Params,
154) -> Result<Plan, PlanError> {
155 let rtw_plan = query::plan_update_query(scx, stmt)?;
156 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
157}
158
159pub fn plan_read_then_write(
160 scx: &StatementContext,
161 kind: MutationKind,
162 params: &Params,
163 query::ReadThenWritePlan {
164 id,
165 mut selection,
166 finishing,
167 assignments,
168 }: query::ReadThenWritePlan,
169) -> Result<Plan, PlanError> {
170 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171 let mut assignments_outer = BTreeMap::new();
172 for (idx, mut set) in assignments {
173 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
174 let set = set.lower_uncorrelated()?;
175 assignments_outer.insert(idx, set);
176 }
177
178 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
179 id,
180 selection,
181 finishing,
182 assignments: assignments_outer,
183 kind,
184 returning: Vec::new(),
185 }))
186}
187
188pub fn describe_select(
189 scx: &StatementContext,
190 stmt: SelectStatement<Aug>,
191) -> Result<StatementDesc, PlanError> {
192 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
193 return Ok(StatementDesc::new(Some(desc)));
194 }
195
196 let query::PlannedRootQuery { desc, .. } =
197 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
198 Ok(StatementDesc::new(Some(desc)))
199}
200
201pub fn plan_select(
202 scx: &StatementContext,
203 select: SelectStatement<Aug>,
204 params: &Params,
205 copy_to: Option<CopyFormat>,
206) -> Result<Plan, PlanError> {
207 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
208 return Ok(Plan::SideEffectingFunc(f));
209 }
210
211 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
212 Ok(Plan::Select(plan))
213}
214
215fn plan_select_inner(
216 scx: &StatementContext,
217 select: SelectStatement<Aug>,
218 params: &Params,
219 copy_to: Option<CopyFormat>,
220) -> Result<(SelectPlan, RelationDesc), PlanError> {
221 let when = query::plan_as_of(scx, select.as_of.clone())?;
222 let lifetime = QueryLifetime::OneShot;
223 let query::PlannedRootQuery {
224 mut expr,
225 desc,
226 finishing,
227 scope: _,
228 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
229 expr.bind_parameters(scx, lifetime, params)?;
230
231 expr.try_visit_mut_pre(&mut |expr| {
234 if let HirRelationExpr::TopK { offset, .. } = expr {
235 let offset_value = offset_into_value(offset.take())?;
236 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
237 }
238 Ok::<(), PlanError>(())
239 })?;
240 let limit = match finishing.limit {
250 None => None,
251 Some(mut limit) => {
252 limit.bind_parameters(scx, lifetime, params)?;
253 let Some(limit) = limit.as_literal() else {
255 sql_bail!(
256 "Top-level LIMIT must be a constant expression, got {}",
257 limit
258 )
259 };
260 match limit {
261 Datum::Null => None,
262 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
263 _ => {
264 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
265 sql_bail!("LIMIT must be a non-negative INT or NULL")
266 }
267 }
268 }
269 };
270 let offset = {
271 let mut offset = finishing.offset.clone();
272 offset.bind_parameters(scx, lifetime, params)?;
273 let offset = offset_into_value(offset.take())?;
274 offset
275 .try_into()
276 .expect("checked in offset_into_value that it is not negative")
277 };
278
279 let plan = SelectPlan {
280 source: expr,
281 when,
282 finishing: RowSetFinishing {
283 limit,
284 offset,
285 project: finishing.project,
286 order_by: finishing.order_by,
287 },
288 copy_to,
289 select: Some(Box::new(select)),
290 };
291
292 Ok((plan, desc))
293}
294
295pub fn describe_explain_plan(
296 scx: &StatementContext,
297 explain: ExplainPlanStatement<Aug>,
298) -> Result<StatementDesc, PlanError> {
299 let mut relation_desc = RelationDesc::builder();
300
301 match explain.stage() {
302 ExplainStage::RawPlan => {
303 let name = "Raw Plan";
304 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
305 }
306 ExplainStage::DecorrelatedPlan => {
307 let name = "Decorrelated Plan";
308 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
309 }
310 ExplainStage::LocalPlan => {
311 let name = "Locally Optimized Plan";
312 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
313 }
314 ExplainStage::GlobalPlan => {
315 let name = "Optimized Plan";
316 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
317 }
318 ExplainStage::PhysicalPlan => {
319 let name = "Physical Plan";
320 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
321 }
322 ExplainStage::Trace => {
323 relation_desc = relation_desc
324 .with_column("Time", SqlScalarType::UInt64.nullable(false))
325 .with_column("Path", SqlScalarType::String.nullable(false))
326 .with_column("Plan", SqlScalarType::String.nullable(false));
327 }
328 ExplainStage::PlanInsights => {
329 let name = "Plan Insights";
330 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
331 }
332 };
333 let relation_desc = relation_desc.finish();
334
335 Ok(
336 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
337 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
338 _ => vec![],
339 }),
340 )
341}
342
343pub fn describe_explain_pushdown(
344 scx: &StatementContext,
345 statement: ExplainPushdownStatement<Aug>,
346) -> Result<StatementDesc, PlanError> {
347 let relation_desc = RelationDesc::builder()
348 .with_column("Source", SqlScalarType::String.nullable(false))
349 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
351 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
352 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
353 .finish();
354
355 Ok(
356 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
357 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
358 _ => vec![],
359 }),
360 )
361}
362
363pub fn describe_explain_analyze_object(
364 _scx: &StatementContext,
365 statement: ExplainAnalyzeObjectStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367 if statement.as_sql {
368 let relation_desc = RelationDesc::builder()
369 .with_column("SQL", SqlScalarType::String.nullable(false))
370 .finish();
371 return Ok(StatementDesc::new(Some(relation_desc)));
372 }
373
374 match statement.properties {
375 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
376 properties,
377 skew,
378 }) => {
379 let mut relation_desc = RelationDesc::builder()
380 .with_column("operator", SqlScalarType::String.nullable(false));
381
382 if skew {
383 relation_desc =
384 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
385 }
386
387 let mut seen_properties = BTreeSet::new();
388 for property in properties {
389 if !seen_properties.insert(property) {
391 continue;
392 }
393
394 match property {
395 ExplainAnalyzeComputationProperty::Memory if skew => {
396 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
397 relation_desc = relation_desc
398 .with_column("memory_ratio", numeric.clone())
399 .with_column("worker_memory", SqlScalarType::String.nullable(true))
400 .with_column("avg_memory", SqlScalarType::String.nullable(true))
401 .with_column("total_memory", SqlScalarType::String.nullable(true))
402 .with_column("records_ratio", numeric.clone())
403 .with_column("worker_records", numeric.clone())
404 .with_column("avg_records", numeric.clone())
405 .with_column("total_records", numeric);
406 }
407 ExplainAnalyzeComputationProperty::Memory => {
408 relation_desc = relation_desc
409 .with_column("total_memory", SqlScalarType::String.nullable(true))
410 .with_column(
411 "total_records",
412 SqlScalarType::Numeric { max_scale: None }.nullable(true),
413 );
414 }
415 ExplainAnalyzeComputationProperty::Cpu => {
416 if skew {
417 relation_desc = relation_desc
418 .with_column(
419 "cpu_ratio",
420 SqlScalarType::Numeric { max_scale: None }.nullable(true),
421 )
422 .with_column(
423 "worker_elapsed",
424 SqlScalarType::Interval.nullable(true),
425 )
426 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
427 }
428 relation_desc = relation_desc
429 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
430 }
431 }
432 }
433
434 let relation_desc = relation_desc.finish();
435 Ok(StatementDesc::new(Some(relation_desc)))
436 }
437 ExplainAnalyzeProperty::Hints => {
438 let relation_desc = RelationDesc::builder()
439 .with_column("operator", SqlScalarType::String.nullable(true))
440 .with_column("levels", SqlScalarType::Int64.nullable(true))
441 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
442 .with_column("hint", SqlScalarType::Float64.nullable(true))
443 .with_column("savings", SqlScalarType::String.nullable(true))
444 .finish();
445 Ok(StatementDesc::new(Some(relation_desc)))
446 }
447 }
448}
449
450pub fn describe_explain_analyze_cluster(
451 _scx: &StatementContext,
452 statement: ExplainAnalyzeClusterStatement,
453) -> Result<StatementDesc, PlanError> {
454 if statement.as_sql {
455 let relation_desc = RelationDesc::builder()
456 .with_column("SQL", SqlScalarType::String.nullable(false))
457 .finish();
458 return Ok(StatementDesc::new(Some(relation_desc)));
459 }
460
461 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
462
463 let mut relation_desc = RelationDesc::builder()
464 .with_column("object", SqlScalarType::String.nullable(false))
465 .with_column("global_id", SqlScalarType::String.nullable(false));
466
467 if skew {
468 relation_desc =
469 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
470 }
471
472 let mut seen_properties = BTreeSet::new();
473 for property in properties {
474 if !seen_properties.insert(property) {
476 continue;
477 }
478
479 match property {
480 ExplainAnalyzeComputationProperty::Memory if skew => {
481 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
482 relation_desc = relation_desc
483 .with_column("max_operator_memory_ratio", numeric.clone())
484 .with_column("worker_memory", SqlScalarType::String.nullable(true))
485 .with_column("avg_memory", SqlScalarType::String.nullable(true))
486 .with_column("total_memory", SqlScalarType::String.nullable(true))
487 .with_column("max_operator_records_ratio", numeric.clone())
488 .with_column("worker_records", numeric.clone())
489 .with_column("avg_records", numeric.clone())
490 .with_column("total_records", numeric);
491 }
492 ExplainAnalyzeComputationProperty::Memory => {
493 relation_desc = relation_desc
494 .with_column("total_memory", SqlScalarType::String.nullable(true))
495 .with_column(
496 "total_records",
497 SqlScalarType::Numeric { max_scale: None }.nullable(true),
498 );
499 }
500 ExplainAnalyzeComputationProperty::Cpu if skew => {
501 relation_desc = relation_desc
502 .with_column(
503 "max_operator_cpu_ratio",
504 SqlScalarType::Numeric { max_scale: None }.nullable(true),
505 )
506 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
507 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
508 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
509 }
510 ExplainAnalyzeComputationProperty::Cpu => {
511 relation_desc = relation_desc
512 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
513 }
514 }
515 }
516
517 Ok(StatementDesc::new(Some(relation_desc.finish())))
518}
519
520pub fn describe_explain_timestamp(
521 scx: &StatementContext,
522 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 let relation_desc = RelationDesc::builder()
525 .with_column("Timestamp", SqlScalarType::String.nullable(false))
526 .finish();
527
528 Ok(StatementDesc::new(Some(relation_desc))
529 .with_params(describe_select(scx, select)?.param_types))
530}
531
532pub fn describe_explain_schema(
533 _: &StatementContext,
534 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
535) -> Result<StatementDesc, PlanError> {
536 let relation_desc = RelationDesc::builder()
537 .with_column("Schema", SqlScalarType::String.nullable(false))
538 .finish();
539 Ok(StatementDesc::new(Some(relation_desc)))
540}
541
542generate_extracted_config!(
551 ExplainPlanOption,
552 (Arity, Option<bool>, Default(None)),
553 (Cardinality, bool, Default(false)),
554 (ColumnNames, bool, Default(false)),
555 (FilterPushdown, Option<bool>, Default(None)),
556 (HumanizedExpressions, Option<bool>, Default(None)),
557 (JoinImplementations, bool, Default(false)),
558 (Keys, bool, Default(false)),
559 (LinearChains, bool, Default(false)),
560 (NoFastPath, bool, Default(false)),
561 (NonNegative, bool, Default(false)),
562 (NoNotices, bool, Default(false)),
563 (NodeIdentifiers, bool, Default(false)),
564 (Raw, bool, Default(false)),
565 (RawPlans, bool, Default(false)),
566 (RawSyntax, bool, Default(false)),
567 (Redacted, bool, Default(false)),
568 (SubtreeSize, bool, Default(false)),
569 (Timing, bool, Default(false)),
570 (Types, bool, Default(false)),
571 (Equivalences, bool, Default(false)),
572 (ReoptimizeImportedViews, Option<bool>, Default(None)),
573 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
574 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
575 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
576 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
577 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
578 (
579 EnableProjectionPushdownAfterRelationCse,
580 Option<bool>,
581 Default(None)
582 )
583);
584
585impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
586 type Error = PlanError;
587
588 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
589 if v.raw {
592 v.raw_plans = true;
593 v.raw_syntax = true;
594 }
595
596 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
599
600 Ok(ExplainConfig {
601 arity: v.arity.unwrap_or(enable_on_prod),
602 cardinality: v.cardinality,
603 column_names: v.column_names,
604 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
605 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
606 join_impls: v.join_implementations,
607 keys: v.keys,
608 linear_chains: !v.raw_plans && v.linear_chains,
609 no_fast_path: v.no_fast_path,
610 no_notices: v.no_notices,
611 node_ids: v.node_identifiers,
612 non_negative: v.non_negative,
613 raw_plans: v.raw_plans,
614 raw_syntax: v.raw_syntax,
615 verbose_syntax: false,
616 redacted: v.redacted,
617 subtree_size: v.subtree_size,
618 equivalences: v.equivalences,
619 timing: v.timing,
620 types: v.types,
621 features: OptimizerFeatureOverrides {
623 enable_guard_subquery_tablefunc: Default::default(),
624 enable_eager_delta_joins: v.enable_eager_delta_joins,
625 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
626 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
627 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
628 enable_consolidate_after_union_negate: Default::default(),
629 enable_reduce_mfp_fusion: Default::default(),
630 enable_cardinality_estimates: Default::default(),
631 persist_fast_path_limit: Default::default(),
632 reoptimize_imported_views: v.reoptimize_imported_views,
633 enable_reduce_reduction: Default::default(),
634 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
635 enable_projection_pushdown_after_relation_cse: v
636 .enable_projection_pushdown_after_relation_cse,
637 enable_less_reduce_in_eqprop: Default::default(),
638 enable_dequadratic_eqprop_map: Default::default(),
639 enable_eq_classes_withholding_errors: Default::default(),
640 enable_fast_path_plan_insights: Default::default(),
641 enable_repr_typecheck: Default::default(),
642 },
643 })
644 }
645}
646
647fn plan_explainee(
648 scx: &StatementContext,
649 explainee: Explainee<Aug>,
650 params: &Params,
651) -> Result<plan::Explainee, PlanError> {
652 use crate::plan::ExplaineeStatement;
653
654 let is_replan = matches!(
655 explainee,
656 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
657 );
658
659 let explainee = match explainee {
660 Explainee::View(name) | Explainee::ReplanView(name) => {
661 let item = scx.get_item_by_resolved_name(&name)?;
662 let item_type = item.item_type();
663 if item_type != CatalogItemType::View {
664 sql_bail!("Expected {name} to be a view, not a {item_type}");
665 }
666 match is_replan {
667 true => crate::plan::Explainee::ReplanView(item.id()),
668 false => crate::plan::Explainee::View(item.id()),
669 }
670 }
671 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
672 let item = scx.get_item_by_resolved_name(&name)?;
673 let item_type = item.item_type();
674 if item_type != CatalogItemType::MaterializedView {
675 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
676 }
677 match is_replan {
678 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
679 false => crate::plan::Explainee::MaterializedView(item.id()),
680 }
681 }
682 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
683 let item = scx.get_item_by_resolved_name(&name)?;
684 let item_type = item.item_type();
685 if item_type != CatalogItemType::Index {
686 sql_bail!("Expected {name} to be an index, not a {item_type}");
687 }
688 match is_replan {
689 true => crate::plan::Explainee::ReplanIndex(item.id()),
690 false => crate::plan::Explainee::Index(item.id()),
691 }
692 }
693 Explainee::Select(select, broken) => {
694 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
695 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
696 }
697 Explainee::CreateView(mut stmt, broken) => {
698 if stmt.if_exists != IfExistsBehavior::Skip {
699 stmt.if_exists = IfExistsBehavior::Skip;
704 } else {
705 sql_bail!(
706 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
707 (the behavior is implied within the scope of an enclosing EXPLAIN)"
708 );
709 }
710
711 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
712 sql_bail!("expected CreateViewPlan plan");
713 };
714
715 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
716 }
717 Explainee::CreateMaterializedView(mut stmt, broken) => {
718 if stmt.if_exists != IfExistsBehavior::Skip {
719 stmt.if_exists = IfExistsBehavior::Skip;
724 } else {
725 sql_bail!(
726 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
727 (the behavior is implied within the scope of an enclosing EXPLAIN)"
728 );
729 }
730
731 let Plan::CreateMaterializedView(plan) =
732 ddl::plan_create_materialized_view(scx, *stmt)?
733 else {
734 sql_bail!("expected CreateMaterializedViewPlan plan");
735 };
736
737 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
738 broken,
739 plan,
740 })
741 }
742 Explainee::CreateIndex(mut stmt, broken) => {
743 if !stmt.if_not_exists {
744 stmt.if_not_exists = true;
747 } else {
748 sql_bail!(
749 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
750 (the behavior is implied within the scope of an enclosing EXPLAIN)"
751 );
752 }
753
754 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
755 sql_bail!("expected CreateIndexPlan plan");
756 };
757
758 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
759 }
760 };
761
762 Ok(explainee)
763}
764
765pub fn plan_explain_plan(
766 scx: &StatementContext,
767 explain: ExplainPlanStatement<Aug>,
768 params: &Params,
769) -> Result<Plan, PlanError> {
770 let (format, verbose_syntax) = match explain.format() {
771 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
772 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
773 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
774 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
775 };
776 let stage = explain.stage();
777
778 let mut config = {
780 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
781
782 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
783 with_options.filter_pushdown = Some(false);
785 }
786
787 ExplainConfig::try_from(with_options)?
788 };
789 config.verbose_syntax = verbose_syntax;
790
791 let explainee = plan_explainee(scx, explain.explainee, params)?;
792
793 Ok(Plan::ExplainPlan(ExplainPlanPlan {
794 stage,
795 format,
796 config,
797 explainee,
798 }))
799}
800
801pub fn plan_explain_schema(
802 scx: &StatementContext,
803 explain_schema: ExplainSinkSchemaStatement<Aug>,
804) -> Result<Plan, PlanError> {
805 let ExplainSinkSchemaStatement {
806 schema_for,
807 format: _,
809 mut statement,
810 } = explain_schema;
811
812 statement.name = Some(UnresolvedItemName::qualified(&[
816 ident!("mz_catalog"),
817 ident!("mz_explain_schema"),
818 ]));
819
820 crate::pure::purify_create_sink_avro_doc_on_options(
821 scx.catalog,
822 *statement.from.item_id(),
823 &mut statement.format,
824 )?;
825
826 match ddl::plan_create_sink(scx, statement)? {
827 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
828 StorageSinkConnection::Kafka(KafkaSinkConnection {
829 format:
830 KafkaSinkFormat {
831 key_format,
832 value_format:
833 KafkaSinkFormatType::Avro {
834 schema: value_schema,
835 ..
836 },
837 ..
838 },
839 ..
840 }) => {
841 let schema = match schema_for {
842 ExplainSinkSchemaFor::Key => key_format
843 .and_then(|f| match f {
844 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
845 _ => None,
846 })
847 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
848 ExplainSinkSchemaFor::Value => value_schema,
849 };
850
851 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
852 sink_from: sink.from,
853 json_schema: schema,
854 }))
855 }
856 _ => bail_unsupported!(
857 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
858 ),
859 },
860 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
861 }
862}
863
864pub fn plan_explain_pushdown(
865 scx: &StatementContext,
866 statement: ExplainPushdownStatement<Aug>,
867 params: &Params,
868) -> Result<Plan, PlanError> {
869 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
870 let explainee = plan_explainee(scx, statement.explainee, params)?;
871 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
872}
873
874pub fn plan_explain_analyze_object(
875 scx: &StatementContext,
876 statement: ExplainAnalyzeObjectStatement<Aug>,
877 params: &Params,
878) -> Result<Plan, PlanError> {
879 let explainee_name = statement
880 .explainee
881 .name()
882 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
883 .full_name_str();
884 let explainee = plan_explainee(scx, statement.explainee, params)?;
885
886 match explainee {
887 plan::Explainee::Index(_index_id) => (),
888 plan::Explainee::MaterializedView(_item_id) => (),
889 _ => {
890 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
891 }
892 };
893
894 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
909 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
910 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
911 let mut order_by = vec!["mlm.lir_id DESC"];
912
913 match statement.properties {
914 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
915 properties,
916 skew,
917 }) => {
918 let mut worker_id = None;
919 let mut seen_properties = BTreeSet::new();
920 for property in properties {
921 if !seen_properties.insert(property) {
923 continue;
924 }
925
926 match property {
927 ExplainAnalyzeComputationProperty::Memory => {
928 ctes.push((
929 "summary_memory",
930 r#"
931 SELECT mlm.global_id AS global_id,
932 mlm.lir_id AS lir_id,
933 SUM(mas.size) AS total_memory,
934 SUM(mas.records) AS total_records,
935 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
936 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
937 FROM mz_introspection.mz_lir_mapping mlm
938 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
939 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
940 ON (mas.operator_id = valid_id)
941GROUP BY mlm.global_id, mlm.lir_id"#,
942 ));
943 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
944
945 if skew {
946 ctes.push((
947 "per_worker_memory",
948 r#"
949 SELECT mlm.global_id AS global_id,
950 mlm.lir_id AS lir_id,
951 mas.worker_id AS worker_id,
952 SUM(mas.size) AS worker_memory,
953 SUM(mas.records) AS worker_records
954 FROM mz_introspection.mz_lir_mapping mlm
955 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
959 ));
960 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
961
962 if let Some(worker_id) = worker_id {
963 predicates.push(format!("pwm.worker_id = {worker_id}"));
964 } else {
965 worker_id = Some("pwm.worker_id");
966 columns.push("pwm.worker_id AS worker_id");
967 order_by.push("worker_id");
968 }
969
970 columns.extend([
971 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
972 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
973 "pg_size_pretty(sm.avg_memory) AS avg_memory",
974 "pg_size_pretty(sm.total_memory) AS total_memory",
975 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
976 "pwm.worker_records AS worker_records",
977 "sm.avg_records AS avg_records",
978 "sm.total_records AS total_records",
979 ]);
980 } else {
981 columns.extend([
982 "pg_size_pretty(sm.total_memory) AS total_memory",
983 "sm.total_records AS total_records",
984 ]);
985 }
986 }
987 ExplainAnalyzeComputationProperty::Cpu => {
988 ctes.push((
989 "summary_cpu",
990 r#"
991 SELECT mlm.global_id AS global_id,
992 mlm.lir_id AS lir_id,
993 SUM(mse.elapsed_ns) AS total_ns,
994 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
995 FROM mz_introspection.mz_lir_mapping mlm
996 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
997 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
998 ON (mse.id = valid_id)
999GROUP BY mlm.global_id, mlm.lir_id"#,
1000 ));
1001 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1002
1003 if skew {
1004 ctes.push((
1005 "per_worker_cpu",
1006 r#"
1007 SELECT mlm.global_id AS global_id,
1008 mlm.lir_id AS lir_id,
1009 mse.worker_id AS worker_id,
1010 SUM(mse.elapsed_ns) AS worker_ns
1011 FROM mz_introspection.mz_lir_mapping mlm
1012 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1013 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1014 ON (mse.id = valid_id)
1015GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1016 ));
1017 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1018
1019 if let Some(worker_id) = worker_id {
1020 predicates.push(format!("pwc.worker_id = {worker_id}"));
1021 } else {
1022 worker_id = Some("pwc.worker_id");
1023 columns.push("pwc.worker_id AS worker_id");
1024 order_by.push("worker_id");
1025 }
1026
1027 columns.extend([
1028 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1029 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1030 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1031 ]);
1032 }
1033 columns.push(
1034 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1035 );
1036 }
1037 }
1038 }
1039 }
1040 ExplainAnalyzeProperty::Hints => {
1041 columns.extend([
1042 "megsa.levels AS levels",
1043 "megsa.to_cut AS to_cut",
1044 "megsa.hint AS hint",
1045 "pg_size_pretty(megsa.savings) AS savings",
1046 ]);
1047 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1048 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1049 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1050 }
1051 }
1052
1053 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1054
1055 let ctes = if !ctes.is_empty() {
1056 format!(
1057 "WITH {}",
1058 separated(
1059 ",\n",
1060 ctes.iter()
1061 .map(|(name, defn)| format!("{name} AS ({defn})"))
1062 )
1063 )
1064 } else {
1065 String::new()
1066 };
1067 let columns = separated(", ", columns);
1068 let from = separated(" ", from);
1069 let predicates = separated(" AND ", predicates);
1070 let order_by = separated(", ", order_by);
1071 let query = format!(
1072 r#"{ctes}
1073SELECT {columns}
1074FROM {from}
1075WHERE {predicates}
1076ORDER BY {order_by}"#
1077 );
1078
1079 if statement.as_sql {
1080 let rows = vec![Row::pack_slice(&[Datum::String(
1081 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1082 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1083 })?,
1084 )])];
1085 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1086
1087 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1088 } else {
1089 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1090 show_select.plan()
1091 }
1092}
1093
1094pub fn plan_explain_analyze_cluster(
1095 scx: &StatementContext,
1096 statement: ExplainAnalyzeClusterStatement,
1097 _params: &Params,
1098) -> Result<Plan, PlanError> {
1099 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1124 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1125 let mut predicates = vec![];
1126 let mut order_by = vec![];
1127
1128 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1129 let mut worker_id = None;
1130 let mut seen_properties = BTreeSet::new();
1131 for property in properties {
1132 if !seen_properties.insert(property) {
1134 continue;
1135 }
1136
1137 match property {
1138 ExplainAnalyzeComputationProperty::Memory => {
1139 if skew {
1140 let mut set_worker_id = false;
1141 if let Some(worker_id) = worker_id {
1142 predicates.push(format!("om.worker_id = {worker_id}"));
1144 } else {
1145 worker_id = Some("om.worker_id");
1146 columns.push("om.worker_id AS worker_id");
1147 set_worker_id = true; };
1149
1150 ctes.push((
1152 "per_operator_memory_summary",
1153 r#"
1154SELECT mlm.global_id AS global_id,
1155 mlm.lir_id AS lir_id,
1156 SUM(mas.size) AS total_memory,
1157 SUM(mas.records) AS total_records,
1158 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1159 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1160FROM mz_introspection.mz_lir_mapping mlm
1161 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1162 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1163 ON (mas.operator_id = valid_id)
1164GROUP BY mlm.global_id, mlm.lir_id"#,
1165 ));
1166
1167 ctes.push((
1169 "per_operator_memory_per_worker",
1170 r#"
1171SELECT mlm.global_id AS global_id,
1172 mlm.lir_id AS lir_id,
1173 mas.worker_id AS worker_id,
1174 SUM(mas.size) AS worker_memory,
1175 SUM(mas.records) AS worker_records
1176FROM mz_introspection.mz_lir_mapping mlm
1177 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1178 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1179 ON (mas.operator_id = valid_id)
1180GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1181 ));
1182
1183 ctes.push((
1185 "per_operator_memory_ratios",
1186 r#"
1187SELECT pompw.global_id AS global_id,
1188 pompw.lir_id AS lir_id,
1189 pompw.worker_id AS worker_id,
1190 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1191 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1192 FROM per_operator_memory_per_worker pompw
1193 JOIN per_operator_memory_summary poms
1194 USING (global_id, lir_id)
1195"#,
1196 ));
1197
1198 ctes.push((
1200 "object_memory",
1201 r#"
1202SELECT pompw.global_id AS global_id,
1203 pompw.worker_id AS worker_id,
1204 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1205 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1206 SUM(pompw.worker_memory) AS worker_memory,
1207 SUM(pompw.worker_records) AS worker_records
1208FROM per_operator_memory_per_worker pompw
1209 JOIN per_operator_memory_ratios pomr
1210 USING (global_id, worker_id, lir_id)
1211GROUP BY pompw.global_id, pompw.worker_id
1212"#,
1213 ));
1214
1215 ctes.push(("object_average_memory", r#"
1217SELECT om.global_id AS global_id,
1218 SUM(om.worker_memory) AS total_memory,
1219 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1220 SUM(om.worker_records) AS total_records,
1221 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1222 FROM object_memory om
1223GROUP BY om.global_id"#));
1224
1225 from.push("LEFT JOIN object_memory om USING (global_id)");
1226 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1227
1228 columns.extend([
1229 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1230 "pg_size_pretty(om.worker_memory) AS worker_memory",
1231 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1232 "pg_size_pretty(oam.total_memory) AS total_memory",
1233 "om.max_operator_records_ratio AS max_operator_records_ratio",
1234 "om.worker_records AS worker_records",
1235 "oam.avg_records AS avg_records",
1236 "oam.total_records AS total_records",
1237 ]);
1238
1239 order_by.extend([
1240 "max_operator_memory_ratio DESC",
1241 "max_operator_records_ratio DESC",
1242 "worker_memory DESC",
1243 "worker_records DESC",
1244 ]);
1245
1246 if set_worker_id {
1247 order_by.push("worker_id");
1248 }
1249 } else {
1250 ctes.push((
1252 "per_operator_memory_totals",
1253 r#"
1254 SELECT mlm.global_id AS global_id,
1255 mlm.lir_id AS lir_id,
1256 SUM(mas.size) AS total_memory,
1257 SUM(mas.records) AS total_records
1258 FROM mz_introspection.mz_lir_mapping mlm
1259 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1260 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1261 ON (mas.operator_id = valid_id)
1262 GROUP BY mlm.global_id, mlm.lir_id"#,
1263 ));
1264
1265 ctes.push((
1266 "object_memory_totals",
1267 r#"
1268SELECT pomt.global_id AS global_id,
1269 SUM(pomt.total_memory) AS total_memory,
1270 SUM(pomt.total_records) AS total_records
1271FROM per_operator_memory_totals pomt
1272GROUP BY pomt.global_id
1273"#,
1274 ));
1275
1276 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1277 columns.extend([
1278 "pg_size_pretty(omt.total_memory) AS total_memory",
1279 "omt.total_records AS total_records",
1280 ]);
1281 order_by.extend(["total_memory DESC", "total_records DESC"]);
1282 }
1283 }
1284 ExplainAnalyzeComputationProperty::Cpu => {
1285 if skew {
1286 let mut set_worker_id = false;
1287 if let Some(worker_id) = worker_id {
1288 predicates.push(format!("oc.worker_id = {worker_id}"));
1290 } else {
1291 worker_id = Some("oc.worker_id");
1292 columns.push("oc.worker_id AS worker_id");
1293 set_worker_id = true; };
1295
1296 ctes.push((
1298 "per_operator_cpu_summary",
1299 r#"
1300SELECT mlm.global_id AS global_id,
1301 mlm.lir_id AS lir_id,
1302 SUM(mse.elapsed_ns) AS total_ns,
1303 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1304FROM mz_introspection.mz_lir_mapping mlm
1305CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1306 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1307 ON (mse.id = valid_id)
1308GROUP BY mlm.global_id, mlm.lir_id"#,
1309));
1310
1311 ctes.push((
1313 "per_operator_cpu_per_worker",
1314 r#"
1315SELECT mlm.global_id AS global_id,
1316 mlm.lir_id AS lir_id,
1317 mse.worker_id AS worker_id,
1318 SUM(mse.elapsed_ns) AS worker_ns
1319FROM mz_introspection.mz_lir_mapping mlm
1320CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1321 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1322 ON (mse.id = valid_id)
1323GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1324 ));
1325
1326 ctes.push((
1328 "per_operator_cpu_ratios",
1329 r#"
1330SELECT pocpw.global_id AS global_id,
1331 pocpw.lir_id AS lir_id,
1332 pocpw.worker_id AS worker_id,
1333 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1334FROM per_operator_cpu_per_worker pocpw
1335 JOIN per_operator_cpu_summary pocs
1336 USING (global_id, lir_id)
1337"#,
1338 ));
1339
1340 ctes.push((
1342 "object_cpu",
1343 r#"
1344SELECT pocpw.global_id AS global_id,
1345 pocpw.worker_id AS worker_id,
1346 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1347 SUM(pocpw.worker_ns) AS worker_ns
1348FROM per_operator_cpu_per_worker pocpw
1349 JOIN per_operator_cpu_ratios pomr
1350 USING (global_id, worker_id, lir_id)
1351GROUP BY pocpw.global_id, pocpw.worker_id
1352"#,
1353 ));
1354
1355 ctes.push((
1357 "object_average_cpu",
1358 r#"
1359SELECT oc.global_id AS global_id,
1360 SUM(oc.worker_ns) AS total_ns,
1361 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1362 FROM object_cpu oc
1363GROUP BY oc.global_id"#,));
1364
1365 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1366 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1367
1368 columns.extend([
1369 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1370 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1371 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1372 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1373 ]);
1374
1375 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1376
1377 if set_worker_id {
1378 order_by.push("worker_id");
1379 }
1380 } else {
1381 ctes.push((
1383 "per_operator_cpu_totals",
1384 r#"
1385 SELECT mlm.global_id AS global_id,
1386 mlm.lir_id AS lir_id,
1387 SUM(mse.elapsed_ns) AS total_ns
1388 FROM mz_introspection.mz_lir_mapping mlm
1389 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1390 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1391 ON (mse.id = valid_id)
1392 GROUP BY mlm.global_id, mlm.lir_id"#,
1393 ));
1394
1395 ctes.push((
1396 "object_cpu_totals",
1397 r#"
1398SELECT poct.global_id AS global_id,
1399 SUM(poct.total_ns) AS total_ns
1400FROM per_operator_cpu_totals poct
1401GROUP BY poct.global_id
1402"#,
1403 ));
1404
1405 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1406 columns
1407 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1408 order_by.extend(["total_elapsed DESC"]);
1409 }
1410 }
1411 }
1412 }
1413
1414 let ctes = if !ctes.is_empty() {
1416 format!(
1417 "WITH {}",
1418 separated(
1419 ",\n",
1420 ctes.iter()
1421 .map(|(name, defn)| format!("{name} AS ({defn})"))
1422 )
1423 )
1424 } else {
1425 String::new()
1426 };
1427 let columns = separated(", ", columns);
1428 let from = separated(" ", from);
1429 let predicates = if !predicates.is_empty() {
1430 format!("WHERE {}", separated(" AND ", predicates))
1431 } else {
1432 String::new()
1433 };
1434 order_by.push("mo.name DESC");
1436 let order_by = separated(", ", order_by);
1437 let query = format!(
1438 r#"{ctes}
1439SELECT {columns}
1440FROM {from}
1441{predicates}
1442ORDER BY {order_by}"#
1443 );
1444
1445 if statement.as_sql {
1446 let rows = vec![Row::pack_slice(&[Datum::String(
1447 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1448 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1449 })?,
1450 )])];
1451 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1452
1453 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1454 } else {
1455 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1456 show_select.plan()
1457 }
1458}
1459
1460pub fn plan_explain_timestamp(
1461 scx: &StatementContext,
1462 explain: ExplainTimestampStatement<Aug>,
1463) -> Result<Plan, PlanError> {
1464 let (format, _verbose_syntax) = match explain.format() {
1465 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1466 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1467 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1468 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1469 };
1470
1471 let raw_plan = {
1472 let query::PlannedRootQuery {
1473 expr: raw_plan,
1474 desc: _,
1475 finishing: _,
1476 scope: _,
1477 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1478 if raw_plan.contains_parameters()? {
1479 return Err(PlanError::ParameterNotAllowed(
1480 "EXPLAIN TIMESTAMP".to_string(),
1481 ));
1482 }
1483
1484 raw_plan
1485 };
1486 let when = query::plan_as_of(scx, explain.select.as_of)?;
1487
1488 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1489 format,
1490 raw_plan,
1491 when,
1492 }))
1493}
1494
1495#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1498pub fn plan_query(
1499 scx: &StatementContext,
1500 query: Query<Aug>,
1501 params: &Params,
1502 lifetime: QueryLifetime,
1503) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1504 let query::PlannedRootQuery {
1505 mut expr,
1506 desc,
1507 finishing,
1508 scope,
1509 } = query::plan_root_query(scx, query, lifetime)?;
1510 expr.bind_parameters(scx, lifetime, params)?;
1511
1512 Ok(query::PlannedRootQuery {
1513 expr: expr.lower(scx.catalog.system_vars(), None)?,
1515 desc,
1516 finishing,
1517 scope,
1518 })
1519}
1520
1521generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1522
1523pub fn describe_subscribe(
1524 scx: &StatementContext,
1525 stmt: SubscribeStatement<Aug>,
1526) -> Result<StatementDesc, PlanError> {
1527 let relation_desc = match stmt.relation {
1528 SubscribeRelation::Name(name) => {
1529 let item = scx.get_item_by_resolved_name(&name)?;
1530 item.desc(&scx.catalog.resolve_full_name(item.name()))?
1531 .into_owned()
1532 }
1533 SubscribeRelation::Query(query) => {
1534 let query::PlannedRootQuery { desc, .. } =
1535 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1536 desc
1537 }
1538 };
1539 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1540 let progress = progress.unwrap_or(false);
1541 let mut desc = RelationDesc::builder().with_column(
1542 "mz_timestamp",
1543 SqlScalarType::Numeric {
1544 max_scale: Some(NumericMaxScale::ZERO),
1545 }
1546 .nullable(false),
1547 );
1548 if progress {
1549 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1550 }
1551
1552 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1553 match stmt.output {
1554 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1555 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1556 for (name, mut ty) in relation_desc.into_iter() {
1557 if progress {
1558 ty.nullable = true;
1559 }
1560 desc = desc.with_column(name, ty);
1561 }
1562 }
1563 SubscribeOutput::EnvelopeUpsert { key_columns }
1564 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1565 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1566 let key_columns = key_columns
1567 .into_iter()
1568 .map(normalize::column_name)
1569 .collect_vec();
1570 let mut before_values_desc = RelationDesc::builder();
1571 let mut after_values_desc = RelationDesc::builder();
1572
1573 for column_name in &key_columns {
1575 let mut column_ty = relation_desc
1576 .get_by_name(column_name)
1577 .map(|(_pos, ty)| ty.clone())
1578 .ok_or_else(|| PlanError::UnknownColumn {
1579 table: None,
1580 column: column_name.clone(),
1581 similar: Box::new([]),
1582 })?;
1583 if progress {
1584 column_ty.nullable = true;
1585 }
1586 desc = desc.with_column(column_name, column_ty);
1587 }
1588
1589 for (mut name, mut ty) in relation_desc
1592 .into_iter()
1593 .filter(|(name, _ty)| !key_columns.contains(name))
1594 {
1595 ty.nullable = true;
1596 before_values_desc =
1597 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1598 if debezium {
1599 name = format!("after_{}", name).into();
1600 }
1601 after_values_desc = after_values_desc.with_column(name, ty);
1602 }
1603
1604 if debezium {
1605 desc = desc.concat(before_values_desc);
1606 }
1607 desc = desc.concat(after_values_desc);
1608 }
1609 }
1610 Ok(StatementDesc::new(Some(desc.finish())))
1611}
1612
1613pub fn plan_subscribe(
1614 scx: &StatementContext,
1615 SubscribeStatement {
1616 relation,
1617 options,
1618 as_of,
1619 up_to,
1620 output,
1621 }: SubscribeStatement<Aug>,
1622 params: &Params,
1623 copy_to: Option<CopyFormat>,
1624) -> Result<Plan, PlanError> {
1625 let (from, desc, scope) = match relation {
1626 SubscribeRelation::Name(name) => {
1627 let entry = scx.get_item_by_resolved_name(&name)?;
1628 let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1629 Ok(desc) => desc,
1630 Err(..) => sql_bail!(
1631 "'{}' cannot be subscribed to because it is a {}",
1632 name.full_name_str(),
1633 entry.item_type(),
1634 ),
1635 };
1636 let item_name = match name {
1637 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1638 _ => None,
1639 };
1640 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1641 (
1642 SubscribeFrom::Id(entry.global_id()),
1643 desc.into_owned(),
1644 scope,
1645 )
1646 }
1647 SubscribeRelation::Query(query) => {
1648 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1650 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1654 &query.finishing,
1655 query.desc.arity()
1656 ));
1657 let desc = query.desc.clone();
1658 (
1659 SubscribeFrom::Query {
1660 expr: query.expr,
1661 desc: query.desc,
1662 },
1663 desc,
1664 query.scope,
1665 )
1666 }
1667 };
1668
1669 let when = query::plan_as_of(scx, as_of)?;
1670 let up_to = up_to
1671 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1672 .transpose()?;
1673
1674 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1675 let ecx = ExprContext {
1676 qcx: &qcx,
1677 name: "",
1678 scope: &scope,
1679 relation_type: desc.typ(),
1680 allow_aggregates: false,
1681 allow_subqueries: true,
1682 allow_parameters: true,
1683 allow_windows: false,
1684 };
1685
1686 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1687 let output = match output {
1688 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1689 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1690 let order_by = key_columns
1691 .iter()
1692 .map(|ident| OrderByExpr {
1693 expr: Expr::Identifier(vec![ident.clone()]),
1694 asc: None,
1695 nulls_last: None,
1696 })
1697 .collect_vec();
1698 let (order_by, map_exprs) = query::plan_order_by_exprs(
1699 &ExprContext {
1700 name: "ENVELOPE UPSERT KEY clause",
1701 ..ecx
1702 },
1703 &order_by[..],
1704 &output_columns[..],
1705 )?;
1706 if !map_exprs.is_empty() {
1707 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1708 }
1709 plan::SubscribeOutput::EnvelopeUpsert {
1710 order_by_keys: order_by,
1711 }
1712 }
1713 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1714 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1715 let order_by = key_columns
1716 .iter()
1717 .map(|ident| OrderByExpr {
1718 expr: Expr::Identifier(vec![ident.clone()]),
1719 asc: None,
1720 nulls_last: None,
1721 })
1722 .collect_vec();
1723 let (order_by, map_exprs) = query::plan_order_by_exprs(
1724 &ExprContext {
1725 name: "ENVELOPE DEBEZIUM KEY clause",
1726 ..ecx
1727 },
1728 &order_by[..],
1729 &output_columns[..],
1730 )?;
1731 if !map_exprs.is_empty() {
1732 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1733 }
1734 plan::SubscribeOutput::EnvelopeDebezium {
1735 order_by_keys: order_by,
1736 }
1737 }
1738 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1739 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1740 let mz_diff = "mz_diff".into();
1741 let output_columns = std::iter::once((0, &mz_diff))
1742 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1743 .collect_vec();
1744 match query::plan_order_by_exprs(
1745 &ExprContext {
1746 name: "WITHIN TIMESTAMP ORDER BY clause",
1747 ..ecx
1748 },
1749 &order_by[..],
1750 &output_columns[..],
1751 ) {
1752 Err(PlanError::UnknownColumn {
1753 table: None,
1754 column,
1755 similar: _,
1756 }) if &column == &mz_diff => {
1757 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1760 }
1761 Err(e) => return Err(e),
1762 Ok((order_by, map_exprs)) => {
1763 if !map_exprs.is_empty() {
1764 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1765 }
1766
1767 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1768 }
1769 }
1770 }
1771 };
1772
1773 let SubscribeOptionExtracted {
1774 progress, snapshot, ..
1775 } = options.try_into()?;
1776 Ok(Plan::Subscribe(SubscribePlan {
1777 from,
1778 when,
1779 up_to,
1780 with_snapshot: snapshot.unwrap_or(true),
1781 copy_to,
1782 emit_progress: progress.unwrap_or(false),
1783 output,
1784 }))
1785}
1786
1787pub fn describe_copy_from_table(
1788 scx: &StatementContext,
1789 table_name: <Aug as AstInfo>::ItemName,
1790 columns: Vec<Ident>,
1791) -> Result<StatementDesc, PlanError> {
1792 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1793 Ok(StatementDesc::new(Some(desc)))
1794}
1795
1796pub fn describe_copy_item(
1797 scx: &StatementContext,
1798 object_name: <Aug as AstInfo>::ItemName,
1799 columns: Vec<Ident>,
1800) -> Result<StatementDesc, PlanError> {
1801 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1802 Ok(StatementDesc::new(Some(desc)))
1803}
1804
1805pub fn describe_copy(
1806 scx: &StatementContext,
1807 CopyStatement {
1808 relation,
1809 direction,
1810 ..
1811 }: CopyStatement<Aug>,
1812) -> Result<StatementDesc, PlanError> {
1813 Ok(match (relation, direction) {
1814 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1815 describe_copy_item(scx, name, columns)?
1816 }
1817 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1818 describe_copy_from_table(scx, name, columns)?
1819 }
1820 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1821 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1822 }
1823 .with_is_copy())
1824}
1825
1826fn plan_copy_to_expr(
1827 scx: &StatementContext,
1828 select_plan: SelectPlan,
1829 desc: RelationDesc,
1830 to: &Expr<Aug>,
1831 format: CopyFormat,
1832 options: CopyOptionExtracted,
1833) -> Result<Plan, PlanError> {
1834 let conn_id = match options.aws_connection {
1835 Some(conn_id) => CatalogItemId::from(conn_id),
1836 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1837 };
1838 let connection = scx.get_item(&conn_id).connection()?;
1839
1840 match connection {
1841 mz_storage_types::connections::Connection::Aws(_) => {}
1842 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1843 }
1844
1845 let format = match format {
1846 CopyFormat::Csv => {
1847 let quote = extract_byte_param_value(options.quote, "quote")?;
1848 let escape = extract_byte_param_value(options.escape, "escape")?;
1849 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1850 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1851 CopyCsvFormatParams::try_new(
1852 delimiter,
1853 quote,
1854 escape,
1855 options.header,
1856 options.null,
1857 )
1858 .map_err(|e| sql_err!("{}", e))?,
1859 ))
1860 }
1861 CopyFormat::Parquet => {
1862 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1864 S3SinkFormat::Parquet
1865 }
1866 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1867 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1868 };
1869
1870 let mut to_expr = to.clone();
1872 transform_ast::transform(scx, &mut to_expr)?;
1873 let relation_type = RelationDesc::empty();
1874 let ecx = &ExprContext {
1875 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1876 name: "COPY TO target",
1877 scope: &Scope::empty(),
1878 relation_type: relation_type.typ(),
1879 allow_aggregates: false,
1880 allow_subqueries: false,
1881 allow_parameters: false,
1882 allow_windows: false,
1883 };
1884
1885 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1886
1887 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1888 sql_bail!(
1889 "MAX FILE SIZE cannot be less than {}",
1890 MIN_S3_SINK_FILE_SIZE
1891 );
1892 }
1893 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1894 sql_bail!(
1895 "MAX FILE SIZE cannot be greater than {}",
1896 MAX_S3_SINK_FILE_SIZE
1897 );
1898 }
1899
1900 Ok(Plan::CopyTo(CopyToPlan {
1901 select_plan,
1902 desc,
1903 to,
1904 connection: connection.to_owned(),
1905 connection_id: conn_id,
1906 format,
1907 max_file_size: options.max_file_size.as_bytes(),
1908 }))
1909}
1910
1911fn plan_copy_from(
1912 scx: &StatementContext,
1913 target: &CopyTarget<Aug>,
1914 table_name: ResolvedItemName,
1915 columns: Vec<Ident>,
1916 format: CopyFormat,
1917 options: CopyOptionExtracted,
1918) -> Result<Plan, PlanError> {
1919 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1920 match option {
1921 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1922 None => Ok(()),
1923 }
1924 }
1925
1926 let source = match target {
1927 CopyTarget::Stdin => CopyFromSource::Stdin,
1928 CopyTarget::Expr(from) => {
1929 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1930
1931 let mut from_expr = from.clone();
1933 transform_ast::transform(scx, &mut from_expr)?;
1934 let relation_type = RelationDesc::empty();
1935 let ecx = &ExprContext {
1936 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1937 name: "COPY FROM target",
1938 scope: &Scope::empty(),
1939 relation_type: relation_type.typ(),
1940 allow_aggregates: false,
1941 allow_subqueries: false,
1942 allow_parameters: false,
1943 allow_windows: false,
1944 };
1945 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1946
1947 match options.aws_connection {
1948 Some(conn_id) => {
1949 let conn_id = CatalogItemId::from(conn_id);
1950
1951 let connection = match scx.get_item(&conn_id).connection()? {
1953 mz_storage_types::connections::Connection::Aws(conn) => conn,
1954 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1955 };
1956
1957 CopyFromSource::AwsS3 {
1958 uri: from,
1959 connection,
1960 connection_id: conn_id,
1961 }
1962 }
1963 None => CopyFromSource::Url(from),
1964 }
1965 }
1966 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1967 };
1968
1969 let params = match format {
1970 CopyFormat::Text => {
1971 only_available_with_csv(options.quote, "quote")?;
1972 only_available_with_csv(options.escape, "escape")?;
1973 only_available_with_csv(options.header, "HEADER")?;
1974 let delimiter =
1975 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1976 let null = match options.null {
1977 Some(null) => Cow::from(null),
1978 None => Cow::from("\\N"),
1979 };
1980 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1981 }
1982 CopyFormat::Csv => {
1983 let quote = extract_byte_param_value(options.quote, "quote")?;
1984 let escape = extract_byte_param_value(options.escape, "escape")?;
1985 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1986 CopyFormatParams::Csv(
1987 CopyCsvFormatParams::try_new(
1988 delimiter,
1989 quote,
1990 escape,
1991 options.header,
1992 options.null,
1993 )
1994 .map_err(|e| sql_err!("{}", e))?,
1995 )
1996 }
1997 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1998 CopyFormat::Parquet => CopyFormatParams::Parquet,
1999 };
2000
2001 let filter = match (options.files, options.pattern) {
2002 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2003 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2004 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2005 (None, None) => None,
2006 };
2007
2008 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2009 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2010 }
2011
2012 let table_name_string = table_name.full_name_str();
2013
2014 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2015
2016 let Some(mfp) = maybe_mfp else {
2017 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2018 };
2019
2020 Ok(Plan::CopyFrom(CopyFromPlan {
2021 target_id: id,
2022 target_name: table_name_string,
2023 source,
2024 columns,
2025 source_desc,
2026 mfp,
2027 params,
2028 filter,
2029 }))
2030}
2031
2032fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2033 match v {
2034 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2035 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2036 None => Ok(None),
2037 }
2038}
2039
2040generate_extracted_config!(
2041 CopyOption,
2042 (Format, String),
2043 (Delimiter, String),
2044 (Null, String),
2045 (Escape, String),
2046 (Quote, String),
2047 (Header, bool),
2048 (AwsConnection, with_options::Object),
2049 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2050 (Files, Vec<String>),
2051 (Pattern, String)
2052);
2053
2054pub fn plan_copy(
2055 scx: &StatementContext,
2056 CopyStatement {
2057 relation,
2058 direction,
2059 target,
2060 options,
2061 }: CopyStatement<Aug>,
2062) -> Result<Plan, PlanError> {
2063 let options = CopyOptionExtracted::try_from(options)?;
2064 let format = options
2067 .format
2068 .as_ref()
2069 .map(|format| match format.to_lowercase().as_str() {
2070 "text" => Ok(CopyFormat::Text),
2071 "csv" => Ok(CopyFormat::Csv),
2072 "binary" => Ok(CopyFormat::Binary),
2073 "parquet" => Ok(CopyFormat::Parquet),
2074 _ => sql_bail!("unknown FORMAT: {}", format),
2075 })
2076 .transpose()?;
2077
2078 match (&direction, &target) {
2079 (CopyDirection::To, CopyTarget::Stdout) => {
2080 if options.delimiter.is_some() {
2081 sql_bail!("COPY TO does not support DELIMITER option yet");
2082 }
2083 if options.quote.is_some() {
2084 sql_bail!("COPY TO does not support QUOTE option yet");
2085 }
2086 if options.null.is_some() {
2087 sql_bail!("COPY TO does not support NULL option yet");
2088 }
2089 match relation {
2090 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2091 CopyRelation::Select(stmt) => Ok(plan_select(
2092 scx,
2093 stmt,
2094 &Params::empty(),
2095 Some(format.unwrap_or(CopyFormat::Text)),
2096 )?),
2097 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2098 scx,
2099 stmt,
2100 &Params::empty(),
2101 Some(format.unwrap_or(CopyFormat::Text)),
2102 )?),
2103 }
2104 }
2105 (CopyDirection::From, target) => match relation {
2106 CopyRelation::Named { name, columns } => plan_copy_from(
2107 scx,
2108 target,
2109 name,
2110 columns,
2111 format.unwrap_or(CopyFormat::Text),
2112 options,
2113 ),
2114 _ => sql_bail!("COPY FROM {} not supported", target),
2115 },
2116 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2117 if !scx.catalog.active_role_id().is_system() {
2122 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2123 }
2124
2125 let format = match format {
2126 Some(inner) => inner,
2127 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2128 };
2129
2130 let stmt = match relation {
2131 CopyRelation::Named { name, columns } => {
2132 if !columns.is_empty() {
2133 sql_bail!(
2135 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2136 );
2137 }
2138 let query = Query {
2140 ctes: CteBlock::empty(),
2141 body: SetExpr::Table(name),
2142 order_by: vec![],
2143 limit: None,
2144 offset: None,
2145 };
2146 SelectStatement { query, as_of: None }
2147 }
2148 CopyRelation::Select(stmt) => {
2149 if !stmt.query.order_by.is_empty() {
2150 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2151 }
2152 stmt
2153 }
2154 _ => sql_bail!("COPY {} {} not supported", direction, target),
2155 };
2156
2157 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2158 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2159 }
2160 _ => sql_bail!("COPY {} {} not supported", direction, target),
2161 }
2162}