1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_ore::num::NonNeg;
22use mz_ore::soft_panic_or_log;
23use mz_ore::str::separated;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
30use mz_sql_parser::ast::{
31 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
32 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
33 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
34 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
35 SetExpr, SubscribeOutput, UnresolvedItemName,
36};
37use mz_sql_parser::ident;
38use mz_storage_types::sinks::{
39 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
40 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
41};
42
43use crate::ast::display::{AstDisplay, escaped_string_literal};
44use crate::ast::{
45 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
46 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
47 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
48 UpdateStatement,
49};
50use crate::catalog::CatalogItemType;
51use crate::names::{Aug, ResolvedItemName};
52use crate::normalize;
53use crate::plan::query::{
54 ExprContext, QueryLifetime, negative_offset_error, offset_into_value, plan_as_of_or_up_to,
55 plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62 ExplainTimestampPlan, HirRelationExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF};
70
71pub fn describe_insert(
78 scx: &StatementContext,
79 InsertStatement {
80 table_name,
81 columns,
82 source,
83 returning,
84 }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87 let desc = if returning.expr.is_empty() {
88 None
89 } else {
90 Some(returning.desc)
91 };
92 Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96 scx: &StatementContext,
97 InsertStatement {
98 table_name,
99 columns,
100 source,
101 returning,
102 }: InsertStatement<Aug>,
103 params: &Params,
104) -> Result<Plan, PlanError> {
105 let (id, mut expr, returning) =
106 query::plan_insert_query(scx, table_name, columns, source, returning)?;
107 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
108 let returning = returning
109 .expr
110 .into_iter()
111 .map(|mut expr| {
112 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
113 expr.lower_uncorrelated(scx.catalog.system_vars())
114 })
115 .collect::<Result<Vec<_>, _>>()?;
116
117 Ok(Plan::Insert(InsertPlan {
118 id,
119 values: expr,
120 returning,
121 }))
122}
123
124pub fn describe_delete(
125 scx: &StatementContext,
126 stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128 query::plan_delete_query(scx, stmt)?;
129 Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133 scx: &StatementContext,
134 stmt: DeleteStatement<Aug>,
135 params: &Params,
136) -> Result<Plan, PlanError> {
137 let rtw_plan = query::plan_delete_query(scx, stmt)?;
138 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142 scx: &StatementContext,
143 stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145 query::plan_update_query(scx, stmt)?;
146 Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150 scx: &StatementContext,
151 stmt: UpdateStatement<Aug>,
152 params: &Params,
153) -> Result<Plan, PlanError> {
154 let rtw_plan = query::plan_update_query(scx, stmt)?;
155 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159 scx: &StatementContext,
160 kind: MutationKind,
161 params: &Params,
162 query::ReadThenWritePlan {
163 id,
164 mut selection,
165 finishing,
166 assignments,
167 }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169 selection.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
170 let mut assignments_outer = BTreeMap::new();
171 for (idx, mut set) in assignments {
172 set.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
173 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
174 assignments_outer.insert(idx, set);
175 }
176
177 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178 id,
179 selection,
180 finishing,
181 assignments: assignments_outer,
182 kind,
183 returning: Vec::new(),
184 }))
185}
186
187pub fn describe_select(
188 scx: &StatementContext,
189 stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192 return Ok(StatementDesc::new(Some(desc)));
193 }
194
195 let query::PlannedRootQuery { desc, .. } =
196 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197 Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201 scx: &StatementContext,
202 select: SelectStatement<Aug>,
203 params: &Params,
204 copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207 return Ok(Plan::SideEffectingFunc(f));
208 }
209
210 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211 Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215 scx: &StatementContext,
216 select: SelectStatement<Aug>,
217 params: &Params,
218 copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220 let when = query::plan_as_of(scx, select.as_of.clone())?;
221 let lifetime = QueryLifetime::OneShot;
222 let query::PlannedRootQuery {
223 mut expr,
224 desc,
225 finishing,
226 scope: _,
227 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228 expr.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
229
230 let limit = match finishing.limit {
236 None => None,
237 Some(mut limit) => {
238 limit.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
239 let Some(limit) = limit.as_literal() else {
241 sql_bail!(
242 "Top-level LIMIT must be a constant expression, got {}",
243 limit
244 )
245 };
246 match limit {
247 Datum::Null => None,
248 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
249 _ => {
250 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
251 sql_bail!("LIMIT must be a non-negative INT or NULL")
252 }
253 }
254 }
255 };
256 let offset = {
257 let mut offset = finishing.offset.clone();
258 offset.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
259 let offset = offset_into_value(offset.take())?;
260 offset.try_into().map_err(|_| {
261 soft_panic_or_log!("unexpectedly negative OFFSET");
263 negative_offset_error(offset)
264 })?
265 };
266
267 if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
271 && select.as_of.is_some()
272 && expr.contains_unmaterializable_except_temporal()?
273 {
274 bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
275 }
276
277 let plan = SelectPlan {
278 source: expr,
279 when,
280 finishing: RowSetFinishing {
281 limit,
282 offset,
283 project: finishing.project,
284 order_by: finishing.order_by,
285 },
286 copy_to,
287 select: Some(Box::new(select)),
288 };
289
290 Ok((plan, desc))
291}
292
293pub fn describe_explain_plan(
294 scx: &StatementContext,
295 explain: ExplainPlanStatement<Aug>,
296) -> Result<StatementDesc, PlanError> {
297 let mut relation_desc = RelationDesc::builder();
298
299 match explain.stage() {
300 ExplainStage::RawPlan => {
301 let name = "Raw Plan";
302 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
303 }
304 ExplainStage::DecorrelatedPlan => {
305 let name = "Decorrelated Plan";
306 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
307 }
308 ExplainStage::LocalPlan => {
309 let name = "Locally Optimized Plan";
310 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
311 }
312 ExplainStage::GlobalPlan => {
313 let name = "Optimized Plan";
314 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
315 }
316 ExplainStage::PhysicalPlan => {
317 let name = "Physical Plan";
318 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
319 }
320 ExplainStage::Trace => {
321 relation_desc = relation_desc
322 .with_column("Time", SqlScalarType::UInt64.nullable(false))
323 .with_column("Path", SqlScalarType::String.nullable(false))
324 .with_column("Plan", SqlScalarType::String.nullable(false));
325 }
326 ExplainStage::PlanInsights => {
327 let name = "Plan Insights";
328 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329 }
330 };
331 let relation_desc = relation_desc.finish();
332
333 Ok(
334 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
335 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
336 _ => vec![],
337 }),
338 )
339}
340
341pub fn describe_explain_pushdown(
342 scx: &StatementContext,
343 statement: ExplainPushdownStatement<Aug>,
344) -> Result<StatementDesc, PlanError> {
345 let relation_desc = RelationDesc::builder()
346 .with_column("Source", SqlScalarType::String.nullable(false))
347 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
348 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
349 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
351 .finish();
352
353 Ok(
354 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
355 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
356 _ => vec![],
357 }),
358 )
359}
360
361pub fn describe_explain_analyze_object(
362 _scx: &StatementContext,
363 statement: ExplainAnalyzeObjectStatement<Aug>,
364) -> Result<StatementDesc, PlanError> {
365 if statement.as_sql {
366 let relation_desc = RelationDesc::builder()
367 .with_column("SQL", SqlScalarType::String.nullable(false))
368 .finish();
369 return Ok(StatementDesc::new(Some(relation_desc)));
370 }
371
372 match statement.properties {
373 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
374 properties,
375 skew,
376 }) => {
377 let mut relation_desc = RelationDesc::builder()
378 .with_column("operator", SqlScalarType::String.nullable(false));
379
380 if skew {
381 relation_desc =
382 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
383 }
384
385 let mut seen_properties = BTreeSet::new();
386 for property in properties {
387 if !seen_properties.insert(property) {
389 continue;
390 }
391
392 match property {
393 ExplainAnalyzeComputationProperty::Memory if skew => {
394 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
395 relation_desc = relation_desc
396 .with_column("memory_ratio", numeric.clone())
397 .with_column("worker_memory", SqlScalarType::String.nullable(true))
398 .with_column("avg_memory", SqlScalarType::String.nullable(true))
399 .with_column("total_memory", SqlScalarType::String.nullable(true))
400 .with_column("records_ratio", numeric.clone())
401 .with_column("worker_records", numeric.clone())
402 .with_column("avg_records", numeric.clone())
403 .with_column("total_records", numeric);
404 }
405 ExplainAnalyzeComputationProperty::Memory => {
406 relation_desc = relation_desc
407 .with_column("total_memory", SqlScalarType::String.nullable(true))
408 .with_column(
409 "total_records",
410 SqlScalarType::Numeric { max_scale: None }.nullable(true),
411 );
412 }
413 ExplainAnalyzeComputationProperty::Cpu => {
414 if skew {
415 relation_desc = relation_desc
416 .with_column(
417 "cpu_ratio",
418 SqlScalarType::Numeric { max_scale: None }.nullable(true),
419 )
420 .with_column(
421 "worker_elapsed",
422 SqlScalarType::Interval.nullable(true),
423 )
424 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
425 }
426 relation_desc = relation_desc
427 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
428 }
429 }
430 }
431
432 let relation_desc = relation_desc.finish();
433 Ok(StatementDesc::new(Some(relation_desc)))
434 }
435 ExplainAnalyzeProperty::Hints => {
436 let relation_desc = RelationDesc::builder()
437 .with_column("operator", SqlScalarType::String.nullable(true))
438 .with_column("levels", SqlScalarType::Int64.nullable(true))
439 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
440 .with_column("hint", SqlScalarType::Float64.nullable(true))
441 .with_column("savings", SqlScalarType::String.nullable(true))
442 .finish();
443 Ok(StatementDesc::new(Some(relation_desc)))
444 }
445 }
446}
447
448pub fn describe_explain_analyze_cluster(
449 _scx: &StatementContext,
450 statement: ExplainAnalyzeClusterStatement,
451) -> Result<StatementDesc, PlanError> {
452 if statement.as_sql {
453 let relation_desc = RelationDesc::builder()
454 .with_column("SQL", SqlScalarType::String.nullable(false))
455 .finish();
456 return Ok(StatementDesc::new(Some(relation_desc)));
457 }
458
459 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
460
461 let mut relation_desc = RelationDesc::builder()
462 .with_column("object", SqlScalarType::String.nullable(false))
463 .with_column("global_id", SqlScalarType::String.nullable(false));
464
465 if skew {
466 relation_desc =
467 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
468 }
469
470 let mut seen_properties = BTreeSet::new();
471 for property in properties {
472 if !seen_properties.insert(property) {
474 continue;
475 }
476
477 match property {
478 ExplainAnalyzeComputationProperty::Memory if skew => {
479 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
480 relation_desc = relation_desc
481 .with_column("max_operator_memory_ratio", numeric.clone())
482 .with_column("worker_memory", SqlScalarType::String.nullable(true))
483 .with_column("avg_memory", SqlScalarType::String.nullable(true))
484 .with_column("total_memory", SqlScalarType::String.nullable(true))
485 .with_column("max_operator_records_ratio", numeric.clone())
486 .with_column("worker_records", numeric.clone())
487 .with_column("avg_records", numeric.clone())
488 .with_column("total_records", numeric);
489 }
490 ExplainAnalyzeComputationProperty::Memory => {
491 relation_desc = relation_desc
492 .with_column("total_memory", SqlScalarType::String.nullable(true))
493 .with_column(
494 "total_records",
495 SqlScalarType::Numeric { max_scale: None }.nullable(true),
496 );
497 }
498 ExplainAnalyzeComputationProperty::Cpu if skew => {
499 relation_desc = relation_desc
500 .with_column(
501 "max_operator_cpu_ratio",
502 SqlScalarType::Numeric { max_scale: None }.nullable(true),
503 )
504 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
505 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
506 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
507 }
508 ExplainAnalyzeComputationProperty::Cpu => {
509 relation_desc = relation_desc
510 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
511 }
512 }
513 }
514
515 Ok(StatementDesc::new(Some(relation_desc.finish())))
516}
517
518pub fn describe_explain_timestamp(
519 scx: &StatementContext,
520 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522 let relation_desc = RelationDesc::builder()
523 .with_column("Timestamp", SqlScalarType::String.nullable(false))
524 .finish();
525
526 Ok(StatementDesc::new(Some(relation_desc))
527 .with_params(describe_select(scx, select)?.param_types))
528}
529
530pub fn describe_explain_schema(
531 _: &StatementContext,
532 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
533) -> Result<StatementDesc, PlanError> {
534 let relation_desc = RelationDesc::builder()
535 .with_column("Schema", SqlScalarType::String.nullable(false))
536 .finish();
537 Ok(StatementDesc::new(Some(relation_desc)))
538}
539
540generate_extracted_config!(
549 ExplainPlanOption,
550 (Arity, Option<bool>, Default(None)),
551 (Cardinality, bool, Default(false)),
552 (ColumnNames, bool, Default(false)),
553 (FilterPushdown, Option<bool>, Default(None)),
554 (HumanizedExpressions, Option<bool>, Default(None)),
555 (JoinImplementations, bool, Default(false)),
556 (Keys, bool, Default(false)),
557 (LinearChains, bool, Default(false)),
558 (NoFastPath, bool, Default(false)),
559 (NonNegative, bool, Default(false)),
560 (NoNotices, bool, Default(false)),
561 (NodeIdentifiers, bool, Default(false)),
562 (Raw, bool, Default(false)),
563 (RawPlans, bool, Default(false)),
564 (RawSyntax, bool, Default(false)),
565 (Redacted, bool, Default(false)),
566 (SubtreeSize, bool, Default(false)),
567 (Timing, bool, Default(false)),
568 (Types, bool, Default(false)),
569 (Equivalences, bool, Default(false)),
570 (ReoptimizeImportedViews, Option<bool>, Default(None)),
571 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
572 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
573 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
574 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
575 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
576 (
577 EnableProjectionPushdownAfterRelationCse,
578 Option<bool>,
579 Default(None)
580 )
581);
582
583impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
584 type Error = PlanError;
585
586 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
587 if v.raw {
590 v.raw_plans = true;
591 v.raw_syntax = true;
592 }
593
594 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
597
598 Ok(ExplainConfig {
599 arity: v.arity.unwrap_or(enable_on_prod),
600 cardinality: v.cardinality,
601 column_names: v.column_names,
602 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
603 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
604 join_impls: v.join_implementations,
605 keys: v.keys,
606 linear_chains: !v.raw_plans && v.linear_chains,
607 no_fast_path: v.no_fast_path,
608 no_notices: v.no_notices,
609 node_ids: v.node_identifiers,
610 non_negative: v.non_negative,
611 raw_plans: v.raw_plans,
612 raw_syntax: v.raw_syntax,
613 verbose_syntax: false,
614 redacted: v.redacted,
615 subtree_size: v.subtree_size,
616 equivalences: v.equivalences,
617 timing: v.timing,
618 types: v.types,
619 features: OptimizerFeatureOverrides {
621 enable_eager_delta_joins: v.enable_eager_delta_joins,
622 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
623 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
624 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
625 enable_consolidate_after_union_negate: Default::default(),
626 enable_reduce_mfp_fusion: Default::default(),
627 enable_cardinality_estimates: Default::default(),
628 persist_fast_path_limit: Default::default(),
629 reoptimize_imported_views: v.reoptimize_imported_views,
630 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
631 enable_projection_pushdown_after_relation_cse: v
632 .enable_projection_pushdown_after_relation_cse,
633 enable_less_reduce_in_eqprop: Default::default(),
634 enable_dequadratic_eqprop_map: Default::default(),
635 enable_eq_classes_withholding_errors: Default::default(),
636 enable_fast_path_plan_insights: Default::default(),
637 enable_cast_elimination: Default::default(),
638 enable_case_literal_transform: Default::default(),
639 enable_simplify_quantified_comparisons: Default::default(),
640 enable_coalesce_case_transform: Default::default(),
641 },
642 })
643 }
644}
645
646fn plan_explainee(
647 scx: &StatementContext,
648 explainee: Explainee<Aug>,
649 params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651 use crate::plan::ExplaineeStatement;
652
653 let is_replan = matches!(
654 explainee,
655 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656 );
657
658 let explainee = match explainee {
659 Explainee::View(name) | Explainee::ReplanView(name) => {
660 let item = scx.get_item_by_resolved_name(&name)?;
661 let item_type = item.item_type();
662 if item_type != CatalogItemType::View {
663 sql_bail!("Expected {name} to be a view, not a {item_type}");
664 }
665 match is_replan {
666 true => crate::plan::Explainee::ReplanView(item.id()),
667 false => crate::plan::Explainee::View(item.id()),
668 }
669 }
670 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671 let item = scx.get_item_by_resolved_name(&name)?;
672 let item_type = item.item_type();
673 if item_type != CatalogItemType::MaterializedView {
674 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675 }
676 match is_replan {
677 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678 false => crate::plan::Explainee::MaterializedView(item.id()),
679 }
680 }
681 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682 let item = scx.get_item_by_resolved_name(&name)?;
683 let item_type = item.item_type();
684 if item_type != CatalogItemType::Index {
685 sql_bail!("Expected {name} to be an index, not a {item_type}");
686 }
687 match is_replan {
688 true => crate::plan::Explainee::ReplanIndex(item.id()),
689 false => crate::plan::Explainee::Index(item.id()),
690 }
691 }
692 Explainee::Select(select, broken) => {
693 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695 }
696 Explainee::CreateView(mut stmt, broken) => {
697 if stmt.if_exists != IfExistsBehavior::Skip {
698 stmt.if_exists = IfExistsBehavior::Skip;
703 } else {
704 sql_bail!(
705 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706 (the behavior is implied within the scope of an enclosing EXPLAIN)"
707 );
708 }
709
710 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711 sql_bail!("expected CreateViewPlan plan");
712 };
713
714 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715 }
716 Explainee::CreateMaterializedView(mut stmt, broken) => {
717 if stmt.if_exists != IfExistsBehavior::Skip {
718 stmt.if_exists = IfExistsBehavior::Skip;
723 } else {
724 sql_bail!(
725 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726 (the behavior is implied within the scope of an enclosing EXPLAIN)"
727 );
728 }
729
730 let Plan::CreateMaterializedView(plan) =
731 ddl::plan_create_materialized_view(scx, *stmt)?
732 else {
733 sql_bail!("expected CreateMaterializedViewPlan plan");
734 };
735
736 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737 broken,
738 plan,
739 })
740 }
741 Explainee::CreateIndex(mut stmt, broken) => {
742 if !stmt.if_not_exists {
743 stmt.if_not_exists = true;
746 } else {
747 sql_bail!(
748 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749 (the behavior is implied within the scope of an enclosing EXPLAIN)"
750 );
751 }
752
753 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754 sql_bail!("expected CreateIndexPlan plan");
755 };
756
757 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758 }
759 Explainee::Subscribe(stmt, broken) => {
760 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761 sql_bail!("expected SubscribePlan");
762 };
763 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764 }
765 };
766
767 Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771 scx: &StatementContext,
772 explain: ExplainPlanStatement<Aug>,
773 params: &Params,
774) -> Result<Plan, PlanError> {
775 let (format, verbose_syntax) = match explain.format() {
776 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780 };
781 let stage = explain.stage();
782
783 let mut config = {
785 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788 with_options.filter_pushdown = Some(false);
790 }
791
792 ExplainConfig::try_from(with_options)?
793 };
794 config.verbose_syntax = verbose_syntax;
795
796 let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798 Ok(Plan::ExplainPlan(ExplainPlanPlan {
799 stage,
800 format,
801 config,
802 explainee,
803 }))
804}
805
806pub fn plan_explain_schema(
807 scx: &StatementContext,
808 explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810 let ExplainSinkSchemaStatement {
811 schema_for,
812 format: _,
814 mut statement,
815 } = explain_schema;
816
817 statement.name = Some(UnresolvedItemName::qualified(&[
821 ident!("mz_catalog"),
822 ident!("mz_explain_schema"),
823 ]));
824
825 crate::pure::purify_create_sink_avro_doc_on_options(
826 scx.catalog,
827 *statement.from.item_id(),
828 &mut statement.format,
829 )?;
830
831 match ddl::plan_create_sink(scx, statement)? {
832 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833 StorageSinkConnection::Kafka(KafkaSinkConnection {
834 format:
835 KafkaSinkFormat {
836 key_format,
837 value_format:
838 KafkaSinkFormatType::Avro {
839 schema: value_schema,
840 ..
841 },
842 ..
843 },
844 ..
845 }) => {
846 let schema = match schema_for {
847 ExplainSinkSchemaFor::Key => key_format
848 .and_then(|f| match f {
849 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850 _ => None,
851 })
852 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853 ExplainSinkSchemaFor::Value => value_schema,
854 };
855
856 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857 sink_from: sink.from,
858 json_schema: schema,
859 }))
860 }
861 _ => bail_unsupported!(
862 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863 ),
864 },
865 _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
866 }
867}
868
869pub fn plan_explain_pushdown(
870 scx: &StatementContext,
871 statement: ExplainPushdownStatement<Aug>,
872 params: &Params,
873) -> Result<Plan, PlanError> {
874 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875 let explainee = plan_explainee(scx, statement.explainee, params)?;
876 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880 scx: &StatementContext,
881 statement: ExplainAnalyzeObjectStatement<Aug>,
882 params: &Params,
883) -> Result<Plan, PlanError> {
884 let explainee_name = statement
885 .explainee
886 .name()
887 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888 .full_name_str();
889 let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891 let check_ownership = |item_id: &CatalogItemId, item_type: &str| -> Result<(), PlanError> {
892 if scx.catalog.restrict_to_user_objects() {
893 let item = scx.catalog.get_item(item_id);
894 if item.owner_id() != *scx.catalog.active_role_id() {
895 let full_name = scx.catalog.resolve_full_name(item.name());
896 return Err(sql_err!("must be owner of {item_type} {full_name}"));
897 }
898 }
899 Ok(())
900 };
901 match &explainee {
902 plan::Explainee::Index(item_id) => check_ownership(item_id, "INDEX")?,
903 plan::Explainee::MaterializedView(item_id) => {
904 check_ownership(item_id, "MATERIALIZED VIEW")?
905 }
906 _ => return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",)),
907 };
908
909 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
924 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
925 let mut predicates = vec![format!(
926 "mo.name = {}",
927 escaped_string_literal(&explainee_name)
928 )];
929 let mut order_by = vec!["mlm.lir_id DESC"];
930
931 match statement.properties {
932 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
933 properties,
934 skew,
935 }) => {
936 let mut worker_id = None;
937 let mut seen_properties = BTreeSet::new();
938 for property in properties {
939 if !seen_properties.insert(property) {
941 continue;
942 }
943
944 match property {
945 ExplainAnalyzeComputationProperty::Memory => {
946 ctes.push((
947 "summary_memory",
948 r#"
949 SELECT mlm.global_id AS global_id,
950 mlm.lir_id AS lir_id,
951 SUM(mas.size) AS total_memory,
952 SUM(mas.records) AS total_records,
953 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
954 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
955 FROM mz_introspection.mz_lir_mapping mlm
956 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
957 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
958 ON (mas.operator_id = valid_id)
959GROUP BY mlm.global_id, mlm.lir_id"#,
960 ));
961 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
962
963 if skew {
964 ctes.push((
965 "per_worker_memory",
966 r#"
967 SELECT mlm.global_id AS global_id,
968 mlm.lir_id AS lir_id,
969 mas.worker_id AS worker_id,
970 SUM(mas.size) AS worker_memory,
971 SUM(mas.records) AS worker_records
972 FROM mz_introspection.mz_lir_mapping mlm
973 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
974 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
975 ON (mas.operator_id = valid_id)
976GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
977 ));
978 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
979
980 if let Some(worker_id) = worker_id {
981 predicates.push(format!("pwm.worker_id = {worker_id}"));
982 } else {
983 worker_id = Some("pwm.worker_id");
984 columns.push("pwm.worker_id AS worker_id");
985 order_by.push("worker_id");
986 }
987
988 columns.extend([
989 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
990 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
991 "pg_size_pretty(sm.avg_memory) AS avg_memory",
992 "pg_size_pretty(sm.total_memory) AS total_memory",
993 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
994 "pwm.worker_records AS worker_records",
995 "sm.avg_records AS avg_records",
996 "sm.total_records AS total_records",
997 ]);
998 } else {
999 columns.extend([
1000 "pg_size_pretty(sm.total_memory) AS total_memory",
1001 "sm.total_records AS total_records",
1002 ]);
1003 }
1004 }
1005 ExplainAnalyzeComputationProperty::Cpu => {
1006 ctes.push((
1007 "summary_cpu",
1008 r#"
1009 SELECT mlm.global_id AS global_id,
1010 mlm.lir_id AS lir_id,
1011 SUM(mse.elapsed_ns) AS total_ns,
1012 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1013 FROM mz_introspection.mz_lir_mapping mlm
1014 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1015 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1016 ON (mse.id = valid_id)
1017GROUP BY mlm.global_id, mlm.lir_id"#,
1018 ));
1019 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1020
1021 if skew {
1022 ctes.push((
1023 "per_worker_cpu",
1024 r#"
1025 SELECT mlm.global_id AS global_id,
1026 mlm.lir_id AS lir_id,
1027 mse.worker_id AS worker_id,
1028 SUM(mse.elapsed_ns) AS worker_ns
1029 FROM mz_introspection.mz_lir_mapping mlm
1030 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1031 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1032 ON (mse.id = valid_id)
1033GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1034 ));
1035 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1036
1037 if let Some(worker_id) = worker_id {
1038 predicates.push(format!("pwc.worker_id = {worker_id}"));
1039 } else {
1040 worker_id = Some("pwc.worker_id");
1041 columns.push("pwc.worker_id AS worker_id");
1042 order_by.push("worker_id");
1043 }
1044
1045 columns.extend([
1046 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1047 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1048 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1049 ]);
1050 }
1051 columns.push(
1052 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1053 );
1054 }
1055 }
1056 }
1057 }
1058 ExplainAnalyzeProperty::Hints => {
1059 columns.extend([
1060 "megsa.levels AS levels",
1061 "megsa.to_cut AS to_cut",
1062 "megsa.hint AS hint",
1063 "pg_size_pretty(megsa.savings) AS savings",
1064 ]);
1065 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1066 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1067 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1068 }
1069 }
1070
1071 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1072
1073 let ctes = if !ctes.is_empty() {
1074 format!(
1075 "WITH {}",
1076 separated(
1077 ",\n",
1078 ctes.iter()
1079 .map(|(name, defn)| format!("{name} AS ({defn})"))
1080 )
1081 )
1082 } else {
1083 String::new()
1084 };
1085 let columns = separated(", ", columns);
1086 let from = separated(" ", from);
1087 let predicates = separated(" AND ", predicates);
1088 let order_by = separated(", ", order_by);
1089 let query = format!(
1090 r#"{ctes}
1091SELECT {columns}
1092FROM {from}
1093WHERE {predicates}
1094ORDER BY {order_by}"#
1095 );
1096
1097 if statement.as_sql {
1098 let rows = vec![Row::pack_slice(&[Datum::String(
1099 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1100 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1101 })?,
1102 )])];
1103 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1104
1105 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1106 } else {
1107 let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1108 scx.record_sql_impl_ids(&resolved_ids);
1109 show_select.plan()
1110 }
1111}
1112
1113pub fn plan_explain_analyze_cluster(
1114 scx: &StatementContext,
1115 statement: ExplainAnalyzeClusterStatement,
1116 _params: &Params,
1117) -> Result<Plan, PlanError> {
1118 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1143 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1144 let mut predicates = vec![];
1145 let mut order_by = vec![];
1146
1147 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1148 let mut worker_id = None;
1149 let mut seen_properties = BTreeSet::new();
1150 for property in properties {
1151 if !seen_properties.insert(property) {
1153 continue;
1154 }
1155
1156 match property {
1157 ExplainAnalyzeComputationProperty::Memory => {
1158 if skew {
1159 let mut set_worker_id = false;
1160 if let Some(worker_id) = worker_id {
1161 predicates.push(format!("om.worker_id = {worker_id}"));
1163 } else {
1164 worker_id = Some("om.worker_id");
1165 columns.push("om.worker_id AS worker_id");
1166 set_worker_id = true; };
1168
1169 ctes.push((
1171 "per_operator_memory_summary",
1172 r#"
1173SELECT mlm.global_id AS global_id,
1174 mlm.lir_id AS lir_id,
1175 SUM(mas.size) AS total_memory,
1176 SUM(mas.records) AS total_records,
1177 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1178 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1179FROM mz_introspection.mz_lir_mapping mlm
1180 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1181 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1182 ON (mas.operator_id = valid_id)
1183GROUP BY mlm.global_id, mlm.lir_id"#,
1184 ));
1185
1186 ctes.push((
1188 "per_operator_memory_per_worker",
1189 r#"
1190SELECT mlm.global_id AS global_id,
1191 mlm.lir_id AS lir_id,
1192 mas.worker_id AS worker_id,
1193 SUM(mas.size) AS worker_memory,
1194 SUM(mas.records) AS worker_records
1195FROM mz_introspection.mz_lir_mapping mlm
1196 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1197 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1198 ON (mas.operator_id = valid_id)
1199GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1200 ));
1201
1202 ctes.push((
1204 "per_operator_memory_ratios",
1205 r#"
1206SELECT pompw.global_id AS global_id,
1207 pompw.lir_id AS lir_id,
1208 pompw.worker_id AS worker_id,
1209 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1210 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1211 FROM per_operator_memory_per_worker pompw
1212 JOIN per_operator_memory_summary poms
1213 USING (global_id, lir_id)
1214"#,
1215 ));
1216
1217 ctes.push((
1219 "object_memory",
1220 r#"
1221SELECT pompw.global_id AS global_id,
1222 pompw.worker_id AS worker_id,
1223 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1224 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1225 SUM(pompw.worker_memory) AS worker_memory,
1226 SUM(pompw.worker_records) AS worker_records
1227FROM per_operator_memory_per_worker pompw
1228 JOIN per_operator_memory_ratios pomr
1229 USING (global_id, worker_id, lir_id)
1230GROUP BY pompw.global_id, pompw.worker_id
1231"#,
1232 ));
1233
1234 ctes.push(("object_average_memory", r#"
1236SELECT om.global_id AS global_id,
1237 SUM(om.worker_memory) AS total_memory,
1238 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1239 SUM(om.worker_records) AS total_records,
1240 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1241 FROM object_memory om
1242GROUP BY om.global_id"#));
1243
1244 from.push("LEFT JOIN object_memory om USING (global_id)");
1245 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1246
1247 columns.extend([
1248 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1249 "pg_size_pretty(om.worker_memory) AS worker_memory",
1250 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1251 "pg_size_pretty(oam.total_memory) AS total_memory",
1252 "om.max_operator_records_ratio AS max_operator_records_ratio",
1253 "om.worker_records AS worker_records",
1254 "oam.avg_records AS avg_records",
1255 "oam.total_records AS total_records",
1256 ]);
1257
1258 order_by.extend([
1259 "max_operator_memory_ratio DESC NULLS LAST",
1260 "max_operator_records_ratio DESC NULLS LAST",
1261 "om.worker_memory DESC NULLS LAST",
1262 "worker_records DESC NULLS LAST",
1263 ]);
1264
1265 if set_worker_id {
1266 order_by.push("worker_id");
1267 }
1268 } else {
1269 ctes.push((
1271 "per_operator_memory_totals",
1272 r#"
1273 SELECT mlm.global_id AS global_id,
1274 mlm.lir_id AS lir_id,
1275 SUM(mas.size) AS total_memory,
1276 SUM(mas.records) AS total_records
1277 FROM mz_introspection.mz_lir_mapping mlm
1278 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1279 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1280 ON (mas.operator_id = valid_id)
1281 GROUP BY mlm.global_id, mlm.lir_id"#,
1282 ));
1283
1284 ctes.push((
1285 "object_memory_totals",
1286 r#"
1287SELECT pomt.global_id AS global_id,
1288 SUM(pomt.total_memory) AS total_memory,
1289 SUM(pomt.total_records) AS total_records
1290FROM per_operator_memory_totals pomt
1291GROUP BY pomt.global_id
1292"#,
1293 ));
1294
1295 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1296 columns.extend([
1297 "pg_size_pretty(omt.total_memory) AS total_memory",
1298 "omt.total_records AS total_records",
1299 ]);
1300 order_by.extend([
1301 "omt.total_memory DESC NULLS LAST",
1302 "total_records DESC NULLS LAST",
1303 ]);
1304 }
1305 }
1306 ExplainAnalyzeComputationProperty::Cpu => {
1307 if skew {
1308 let mut set_worker_id = false;
1309 if let Some(worker_id) = worker_id {
1310 predicates.push(format!("oc.worker_id = {worker_id}"));
1312 } else {
1313 worker_id = Some("oc.worker_id");
1314 columns.push("oc.worker_id AS worker_id");
1315 set_worker_id = true; };
1317
1318 ctes.push((
1320 "per_operator_cpu_summary",
1321 r#"
1322SELECT mlm.global_id AS global_id,
1323 mlm.lir_id AS lir_id,
1324 SUM(mse.elapsed_ns) AS total_ns,
1325 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1326FROM mz_introspection.mz_lir_mapping mlm
1327CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1328 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1329 ON (mse.id = valid_id)
1330GROUP BY mlm.global_id, mlm.lir_id"#,
1331));
1332
1333 ctes.push((
1335 "per_operator_cpu_per_worker",
1336 r#"
1337SELECT mlm.global_id AS global_id,
1338 mlm.lir_id AS lir_id,
1339 mse.worker_id AS worker_id,
1340 SUM(mse.elapsed_ns) AS worker_ns
1341FROM mz_introspection.mz_lir_mapping mlm
1342CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1343 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1344 ON (mse.id = valid_id)
1345GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1346 ));
1347
1348 ctes.push((
1350 "per_operator_cpu_ratios",
1351 r#"
1352SELECT pocpw.global_id AS global_id,
1353 pocpw.lir_id AS lir_id,
1354 pocpw.worker_id AS worker_id,
1355 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1356FROM per_operator_cpu_per_worker pocpw
1357 JOIN per_operator_cpu_summary pocs
1358 USING (global_id, lir_id)
1359"#,
1360 ));
1361
1362 ctes.push((
1364 "object_cpu",
1365 r#"
1366SELECT pocpw.global_id AS global_id,
1367 pocpw.worker_id AS worker_id,
1368 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1369 SUM(pocpw.worker_ns) AS worker_ns
1370FROM per_operator_cpu_per_worker pocpw
1371 JOIN per_operator_cpu_ratios pomr
1372 USING (global_id, worker_id, lir_id)
1373GROUP BY pocpw.global_id, pocpw.worker_id
1374"#,
1375 ));
1376
1377 ctes.push((
1379 "object_average_cpu",
1380 r#"
1381SELECT oc.global_id AS global_id,
1382 SUM(oc.worker_ns) AS total_ns,
1383 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1384 FROM object_cpu oc
1385GROUP BY oc.global_id"#,));
1386
1387 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1388 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1389
1390 columns.extend([
1391 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1392 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1393 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1394 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1395 ]);
1396
1397 order_by.extend([
1398 "max_operator_cpu_ratio DESC NULLS LAST",
1399 "worker_elapsed DESC NULLS LAST",
1400 ]);
1401
1402 if set_worker_id {
1403 order_by.push("worker_id");
1404 }
1405 } else {
1406 ctes.push((
1408 "per_operator_cpu_totals",
1409 r#"
1410 SELECT mlm.global_id AS global_id,
1411 mlm.lir_id AS lir_id,
1412 SUM(mse.elapsed_ns) AS total_ns
1413 FROM mz_introspection.mz_lir_mapping mlm
1414 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1415 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1416 ON (mse.id = valid_id)
1417 GROUP BY mlm.global_id, mlm.lir_id"#,
1418 ));
1419
1420 ctes.push((
1421 "object_cpu_totals",
1422 r#"
1423SELECT poct.global_id AS global_id,
1424 SUM(poct.total_ns) AS total_ns
1425FROM per_operator_cpu_totals poct
1426GROUP BY poct.global_id
1427"#,
1428 ));
1429
1430 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1431 columns
1432 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1433 order_by.extend(["total_elapsed DESC NULLS LAST"]);
1434 }
1435 }
1436 }
1437 }
1438
1439 let ctes = if !ctes.is_empty() {
1441 format!(
1442 "WITH {}",
1443 separated(
1444 ",\n",
1445 ctes.iter()
1446 .map(|(name, defn)| format!("{name} AS ({defn})"))
1447 )
1448 )
1449 } else {
1450 String::new()
1451 };
1452 let columns = separated(", ", columns);
1453 let from = separated(" ", from);
1454 let predicates = if !predicates.is_empty() {
1455 format!("WHERE {}", separated(" AND ", predicates))
1456 } else {
1457 String::new()
1458 };
1459 order_by.push("mo.name DESC");
1461 let order_by = separated(", ", order_by);
1462 let query = format!(
1463 r#"{ctes}
1464SELECT {columns}
1465FROM {from}
1466{predicates}
1467ORDER BY {order_by}"#
1468 );
1469
1470 if statement.as_sql {
1471 let rows = vec![Row::pack_slice(&[Datum::String(
1472 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1473 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1474 })?,
1475 )])];
1476 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1477
1478 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1479 } else {
1480 let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1481 scx.record_sql_impl_ids(&resolved_ids);
1482 show_select.plan()
1483 }
1484}
1485
1486pub fn plan_explain_timestamp(
1487 scx: &StatementContext,
1488 explain: ExplainTimestampStatement<Aug>,
1489) -> Result<Plan, PlanError> {
1490 let (format, _verbose_syntax) = match explain.format() {
1491 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1492 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1493 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1494 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1495 };
1496
1497 let raw_plan = {
1498 let query::PlannedRootQuery {
1499 expr: raw_plan,
1500 desc: _,
1501 finishing: _,
1502 scope: _,
1503 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1504 if raw_plan.contains_parameters()? {
1505 return Err(PlanError::ParameterNotAllowed(
1506 "EXPLAIN TIMESTAMP".to_string(),
1507 ));
1508 }
1509
1510 raw_plan
1511 };
1512 let when = query::plan_as_of(scx, explain.select.as_of)?;
1513
1514 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1515 format,
1516 raw_plan,
1517 when,
1518 }))
1519}
1520
1521generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1522
1523pub fn describe_subscribe(
1524 scx: &StatementContext,
1525 stmt: SubscribeStatement<Aug>,
1526) -> Result<StatementDesc, PlanError> {
1527 let relation_desc = match stmt.relation {
1528 SubscribeRelation::Name(name) => {
1529 let item = scx.get_item_by_resolved_name(&name)?;
1530 match item.relation_desc() {
1531 Some(desc) => desc.into_owned(),
1532 None => sql_bail!(
1533 "'{}' cannot be subscribed to because it is a {}",
1534 name.full_name_str(),
1535 item.item_type(),
1536 ),
1537 }
1538 }
1539 SubscribeRelation::Query(query) => {
1540 let query::PlannedRootQuery { desc, .. } =
1541 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1542 desc
1543 }
1544 };
1545 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1546 let progress = progress.unwrap_or(false);
1547 let mut desc = RelationDesc::builder().with_column(
1548 "mz_timestamp",
1549 SqlScalarType::Numeric {
1550 max_scale: Some(NumericMaxScale::ZERO),
1551 }
1552 .nullable(false),
1553 );
1554 if progress {
1555 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1556 }
1557
1558 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1559 match stmt.output {
1560 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1561 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1562 for (name, mut ty) in relation_desc.into_iter() {
1563 if progress {
1564 ty.nullable = true;
1565 }
1566 desc = desc.with_column(name, ty);
1567 }
1568 }
1569 SubscribeOutput::EnvelopeUpsert { key_columns }
1570 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1571 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1572 let key_columns = key_columns
1573 .into_iter()
1574 .map(normalize::column_name)
1575 .collect_vec();
1576 let mut before_values_desc = RelationDesc::builder();
1577 let mut after_values_desc = RelationDesc::builder();
1578
1579 for column_name in &key_columns {
1581 let mut column_ty = relation_desc
1582 .get_by_name(column_name)
1583 .map(|(_pos, ty)| ty.clone())
1584 .ok_or_else(|| PlanError::UnknownColumn {
1585 table: None,
1586 column: column_name.clone(),
1587 similar: Box::new([]),
1588 })?;
1589 if progress {
1590 column_ty.nullable = true;
1591 }
1592 desc = desc.with_column(column_name, column_ty);
1593 }
1594
1595 for (mut name, mut ty) in relation_desc
1598 .into_iter()
1599 .filter(|(name, _ty)| !key_columns.contains(name))
1600 {
1601 ty.nullable = true;
1602 before_values_desc =
1603 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1604 if debezium {
1605 name = format!("after_{}", name).into();
1606 }
1607 after_values_desc = after_values_desc.with_column(name, ty);
1608 }
1609
1610 if debezium {
1611 desc = desc.concat(before_values_desc);
1612 }
1613 desc = desc.concat(after_values_desc);
1614 }
1615 }
1616 Ok(StatementDesc::new(Some(desc.finish())))
1617}
1618
1619pub fn plan_subscribe(
1620 scx: &StatementContext,
1621 SubscribeStatement {
1622 relation,
1623 options,
1624 as_of,
1625 up_to,
1626 output,
1627 }: SubscribeStatement<Aug>,
1628 params: &Params,
1629 copy_to: Option<CopyFormat>,
1630) -> Result<Plan, PlanError> {
1631 let (from, desc, scope) = match relation {
1632 SubscribeRelation::Name(name) => {
1633 let item = scx.get_item_by_resolved_name(&name)?;
1634 let Some(desc) = item.relation_desc() else {
1635 sql_bail!(
1636 "'{}' cannot be subscribed to because it is a {}",
1637 name.full_name_str(),
1638 item.item_type(),
1639 );
1640 };
1641 let item_name = match name {
1642 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1643 _ => None,
1644 };
1645 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1646 (
1647 SubscribeFrom::Id(item.global_id()),
1648 desc.into_owned(),
1649 scope,
1650 )
1651 }
1652 SubscribeRelation::Query(query) => {
1653 #[allow(deprecated)] let query::PlannedRootQuery {
1655 mut expr,
1656 desc,
1657 finishing,
1658 scope,
1659 } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1660 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::Subscribe, params)?;
1661 let query = query::PlannedRootQuery {
1662 expr,
1663 desc,
1664 finishing,
1665 scope,
1666 };
1667 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1671 &query.finishing,
1672 query.desc.arity()
1673 ));
1674 let desc = query.desc.clone();
1675 (
1676 SubscribeFrom::Query {
1677 expr: query.expr,
1678 desc: query.desc,
1679 },
1680 desc,
1681 query.scope,
1682 )
1683 }
1684 };
1685
1686 let when = query::plan_as_of(scx, as_of)?;
1687 let up_to = up_to
1688 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1689 .transpose()?;
1690
1691 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1692 let ecx = ExprContext {
1693 qcx: &qcx,
1694 name: "",
1695 scope: &scope,
1696 relation_type: desc.typ(),
1697 allow_aggregates: false,
1698 allow_subqueries: true,
1699 allow_parameters: true,
1700 allow_windows: false,
1701 };
1702
1703 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1704 let output = match output {
1705 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1706 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1707 let order_by = key_columns
1708 .iter()
1709 .map(|ident| OrderByExpr {
1710 expr: Expr::Identifier(vec![ident.clone()]),
1711 asc: None,
1712 nulls_last: None,
1713 })
1714 .collect_vec();
1715 let (order_by, map_exprs) = query::plan_order_by_exprs(
1716 &ExprContext {
1717 name: "ENVELOPE UPSERT KEY clause",
1718 ..ecx
1719 },
1720 &order_by[..],
1721 &output_columns[..],
1722 )?;
1723 if !map_exprs.is_empty() {
1724 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1725 }
1726 plan::SubscribeOutput::EnvelopeUpsert {
1727 order_by_keys: order_by,
1728 }
1729 }
1730 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1731 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1732 let order_by = key_columns
1733 .iter()
1734 .map(|ident| OrderByExpr {
1735 expr: Expr::Identifier(vec![ident.clone()]),
1736 asc: None,
1737 nulls_last: None,
1738 })
1739 .collect_vec();
1740 let (order_by, map_exprs) = query::plan_order_by_exprs(
1741 &ExprContext {
1742 name: "ENVELOPE DEBEZIUM KEY clause",
1743 ..ecx
1744 },
1745 &order_by[..],
1746 &output_columns[..],
1747 )?;
1748 if !map_exprs.is_empty() {
1749 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1750 }
1751 plan::SubscribeOutput::EnvelopeDebezium {
1752 order_by_keys: order_by,
1753 }
1754 }
1755 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1756 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1757 let mz_diff = "mz_diff".into();
1758 let output_columns = std::iter::once((0, &mz_diff))
1759 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1760 .collect_vec();
1761 match query::plan_order_by_exprs(
1762 &ExprContext {
1763 name: "WITHIN TIMESTAMP ORDER BY clause",
1764 ..ecx
1765 },
1766 &order_by[..],
1767 &output_columns[..],
1768 ) {
1769 Err(PlanError::UnknownColumn {
1770 table: None,
1771 column,
1772 similar: _,
1773 }) if &column == &mz_diff => {
1774 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1777 }
1778 Err(e) => return Err(e),
1779 Ok((order_by, map_exprs)) => {
1780 if !map_exprs.is_empty() {
1781 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1782 }
1783
1784 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1785 }
1786 }
1787 }
1788 };
1789
1790 let SubscribeOptionExtracted {
1791 progress, snapshot, ..
1792 } = options.try_into()?;
1793 Ok(Plan::Subscribe(SubscribePlan {
1794 from,
1795 when,
1796 up_to,
1797 with_snapshot: snapshot.unwrap_or(true),
1798 copy_to,
1799 emit_progress: progress.unwrap_or(false),
1800 output,
1801 }))
1802}
1803
1804pub fn describe_copy_from_table(
1805 scx: &StatementContext,
1806 table_name: <Aug as AstInfo>::ItemName,
1807 columns: Vec<Ident>,
1808) -> Result<StatementDesc, PlanError> {
1809 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1810 Ok(StatementDesc::new(Some(desc)))
1811}
1812
1813pub fn describe_copy_item(
1814 scx: &StatementContext,
1815 object_name: <Aug as AstInfo>::ItemName,
1816 columns: Vec<Ident>,
1817) -> Result<StatementDesc, PlanError> {
1818 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1819 Ok(StatementDesc::new(Some(desc)))
1820}
1821
1822pub fn describe_copy(
1823 scx: &StatementContext,
1824 CopyStatement {
1825 relation,
1826 direction,
1827 ..
1828 }: CopyStatement<Aug>,
1829) -> Result<StatementDesc, PlanError> {
1830 Ok(match (relation, direction) {
1831 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1832 describe_copy_item(scx, name, columns)?
1833 }
1834 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1835 describe_copy_from_table(scx, name, columns)?
1836 }
1837 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1838 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1839 }
1840 .with_is_copy())
1841}
1842
1843fn plan_copy_to_expr(
1844 scx: &StatementContext,
1845 select_plan: SelectPlan,
1846 desc: RelationDesc,
1847 to: &Expr<Aug>,
1848 format: CopyFormat,
1849 options: CopyOptionExtracted,
1850) -> Result<Plan, PlanError> {
1851 let conn_id = match options.aws_connection {
1852 Some(conn_id) => CatalogItemId::from(conn_id),
1853 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1854 };
1855 let connection = scx.get_item(&conn_id).connection()?;
1856
1857 match connection {
1858 mz_storage_types::connections::Connection::Aws(_) => {}
1859 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1860 }
1861
1862 let format = match format {
1863 CopyFormat::Csv => {
1864 let quote = extract_byte_param_value(options.quote, "quote")?;
1865 let escape = extract_byte_param_value(options.escape, "escape")?;
1866 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1867 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1868 CopyCsvFormatParams::try_new(
1869 delimiter,
1870 quote,
1871 escape,
1872 options.header,
1873 options.null,
1874 )
1875 .map_err(|e| sql_err!("{}", e))?,
1876 ))
1877 }
1878 CopyFormat::Parquet => {
1879 ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1882 .map_err(|e| sql_err!("{}", e))?;
1883 S3SinkFormat::Parquet
1884 }
1885 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1886 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1887 };
1888
1889 let mut to_expr = to.clone();
1891 transform_ast::transform(scx, &mut to_expr)?;
1892 let relation_type = RelationDesc::empty();
1893 let ecx = &ExprContext {
1894 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1895 name: "COPY TO target",
1896 scope: &Scope::empty(),
1897 relation_type: relation_type.typ(),
1898 allow_aggregates: false,
1899 allow_subqueries: false,
1900 allow_parameters: false,
1901 allow_windows: false,
1902 };
1903
1904 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1905
1906 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1907 sql_bail!(
1908 "MAX FILE SIZE cannot be less than {}",
1909 MIN_S3_SINK_FILE_SIZE
1910 );
1911 }
1912 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1913 sql_bail!(
1914 "MAX FILE SIZE cannot be greater than {}",
1915 MAX_S3_SINK_FILE_SIZE
1916 );
1917 }
1918
1919 Ok(Plan::CopyTo(CopyToPlan {
1920 select_plan,
1921 desc,
1922 to,
1923 connection: connection.to_owned(),
1924 connection_id: conn_id,
1925 format,
1926 max_file_size: options.max_file_size.as_bytes(),
1927 }))
1928}
1929
1930fn plan_copy_from(
1931 scx: &StatementContext,
1932 target: &CopyTarget<Aug>,
1933 table_name: ResolvedItemName,
1934 columns: Vec<Ident>,
1935 format: CopyFormat,
1936 options: CopyOptionExtracted,
1937) -> Result<Plan, PlanError> {
1938 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1939 match option {
1940 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1941 None => Ok(()),
1942 }
1943 }
1944
1945 let source = match target {
1946 CopyTarget::Stdin => CopyFromSource::Stdin,
1947 CopyTarget::Expr(from) => {
1948 let mut from_expr = from.clone();
1950 transform_ast::transform(scx, &mut from_expr)?;
1951 let relation_type = RelationDesc::empty();
1952 let ecx = &ExprContext {
1953 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1954 name: "COPY FROM target",
1955 scope: &Scope::empty(),
1956 relation_type: relation_type.typ(),
1957 allow_aggregates: false,
1958 allow_subqueries: false,
1959 allow_parameters: false,
1960 allow_windows: false,
1961 };
1962 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1963
1964 match options.aws_connection {
1965 Some(conn_id) => {
1966 let conn_id = CatalogItemId::from(conn_id);
1967
1968 let connection = match scx.get_item(&conn_id).connection()? {
1970 mz_storage_types::connections::Connection::Aws(conn) => conn,
1971 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1972 };
1973
1974 CopyFromSource::AwsS3 {
1975 uri: from,
1976 connection,
1977 connection_id: conn_id,
1978 }
1979 }
1980 None => CopyFromSource::Url(from),
1981 }
1982 }
1983 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1984 };
1985
1986 let params = match format {
1987 CopyFormat::Text => {
1988 only_available_with_csv(options.quote, "quote")?;
1989 only_available_with_csv(options.escape, "escape")?;
1990 only_available_with_csv(options.header, "HEADER")?;
1991 let delimiter =
1992 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1993 let null = match options.null {
1994 Some(null) => Cow::from(null),
1995 None => Cow::from("\\N"),
1996 };
1997 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1998 }
1999 CopyFormat::Csv => {
2000 let quote = extract_byte_param_value(options.quote, "quote")?;
2001 let escape = extract_byte_param_value(options.escape, "escape")?;
2002 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2003 CopyFormatParams::Csv(
2004 CopyCsvFormatParams::try_new(
2005 delimiter,
2006 quote,
2007 escape,
2008 options.header,
2009 options.null,
2010 )
2011 .map_err(|e| sql_err!("{}", e))?,
2012 )
2013 }
2014 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2015 CopyFormat::Parquet => CopyFormatParams::Parquet,
2016 };
2017
2018 let filter = match (options.files, options.pattern) {
2019 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2020 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2021 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2022 (None, None) => None,
2023 };
2024
2025 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2026 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2027 }
2028
2029 let table_name_string = table_name.full_name_str();
2030
2031 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2032
2033 let Some(mfp) = maybe_mfp else {
2034 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2035 };
2036
2037 Ok(Plan::CopyFrom(CopyFromPlan {
2038 target_id: id,
2039 target_name: table_name_string,
2040 source,
2041 columns,
2042 source_desc,
2043 mfp,
2044 params,
2045 filter,
2046 }))
2047}
2048
2049fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2050 match v {
2051 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2052 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2053 None => Ok(None),
2054 }
2055}
2056
2057generate_extracted_config!(
2058 CopyOption,
2059 (Format, String),
2060 (Delimiter, String),
2061 (Null, String),
2062 (Escape, String),
2063 (Quote, String),
2064 (Header, bool),
2065 (AwsConnection, with_options::Object),
2066 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2067 (Files, Vec<String>),
2068 (Pattern, String)
2069);
2070
2071pub fn plan_copy(
2072 scx: &StatementContext,
2073 CopyStatement {
2074 relation,
2075 direction,
2076 target,
2077 options,
2078 }: CopyStatement<Aug>,
2079) -> Result<Plan, PlanError> {
2080 let options = CopyOptionExtracted::try_from(options)?;
2081 let format = options
2084 .format
2085 .as_ref()
2086 .map(|format| match format.to_lowercase().as_str() {
2087 "text" => Ok(CopyFormat::Text),
2088 "csv" => Ok(CopyFormat::Csv),
2089 "binary" => Ok(CopyFormat::Binary),
2090 "parquet" => Ok(CopyFormat::Parquet),
2091 _ => sql_bail!("unknown FORMAT: {}", format),
2092 })
2093 .transpose()?;
2094
2095 match (&direction, &target) {
2096 (CopyDirection::To, CopyTarget::Stdout) => {
2097 if options.delimiter.is_some() {
2098 sql_bail!("COPY TO does not support DELIMITER option yet");
2099 }
2100 if options.quote.is_some() {
2101 sql_bail!("COPY TO does not support QUOTE option yet");
2102 }
2103 if options.null.is_some() {
2104 sql_bail!("COPY TO does not support NULL option yet");
2105 }
2106 match relation {
2107 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2108 CopyRelation::Select(stmt) => Ok(plan_select(
2109 scx,
2110 stmt,
2111 &Params::empty(),
2112 Some(format.unwrap_or(CopyFormat::Text)),
2113 )?),
2114 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2115 scx,
2116 stmt,
2117 &Params::empty(),
2118 Some(format.unwrap_or(CopyFormat::Text)),
2119 )?),
2120 }
2121 }
2122 (CopyDirection::From, target) => match relation {
2123 CopyRelation::Named { name, columns } => plan_copy_from(
2124 scx,
2125 target,
2126 name,
2127 columns,
2128 format.unwrap_or(CopyFormat::Text),
2129 options,
2130 ),
2131 _ => sql_bail!("COPY FROM {} not supported", target),
2132 },
2133 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2134 let format = match format {
2135 Some(inner) => inner,
2136 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2137 };
2138
2139 let stmt = match relation {
2140 CopyRelation::Named { name, columns } => {
2141 if !columns.is_empty() {
2142 sql_bail!(
2144 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2145 );
2146 }
2147 let query = Query {
2149 ctes: CteBlock::empty(),
2150 body: SetExpr::Table(name),
2151 order_by: vec![],
2152 limit: None,
2153 offset: None,
2154 };
2155 SelectStatement { query, as_of: None }
2156 }
2157 CopyRelation::Select(stmt) => {
2158 if !stmt.query.order_by.is_empty() {
2159 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2160 }
2161 stmt
2162 }
2163 CopyRelation::Subscribe(_) => {
2164 sql_bail!("COPY {} {} not supported", direction, target)
2165 }
2166 };
2167
2168 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2169 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2170 }
2171 _ => sql_bail!("COPY {} {} not supported", direction, target),
2172 }
2173}