1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::{ColumnOrder, RowSetFinishing};
21use mz_ore::num::NonNeg;
22use mz_ore::soft_panic_or_log;
23use mz_ore::str::separated;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
30use mz_sql_parser::ast::{
31 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
32 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
33 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
34 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
35 SetExpr, SubscribeOutput, UnresolvedItemName,
36};
37use mz_sql_parser::ident;
38use mz_storage_types::sinks::{
39 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
40 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
41};
42
43use crate::ast::display::{AstDisplay, escaped_string_literal};
44use crate::ast::{
45 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
46 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
47 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
48 UpdateStatement,
49};
50use crate::catalog::CatalogItemType;
51use crate::names::{Aug, ResolvedItemName};
52use crate::normalize;
53use crate::plan::query::{
54 ExprContext, QueryLifetime, negative_offset_error, offset_into_value, plan_as_of_or_up_to,
55 plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62 ExplainTimestampPlan, HirRelationExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF};
70
71pub fn describe_insert(
78 scx: &StatementContext,
79 InsertStatement {
80 table_name,
81 columns,
82 source,
83 returning,
84 }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87 let desc = if returning.expr.is_empty() {
88 None
89 } else {
90 Some(returning.desc)
91 };
92 Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96 scx: &StatementContext,
97 InsertStatement {
98 table_name,
99 columns,
100 source,
101 returning,
102 }: InsertStatement<Aug>,
103 params: &Params,
104) -> Result<Plan, PlanError> {
105 let (id, mut expr, returning) =
106 query::plan_insert_query(scx, table_name, columns, source, returning)?;
107 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
108 let returning = returning
109 .expr
110 .into_iter()
111 .map(|mut expr| {
112 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
113 expr.lower_uncorrelated(scx.catalog.system_vars())
114 })
115 .collect::<Result<Vec<_>, _>>()?;
116
117 Ok(Plan::Insert(InsertPlan {
118 id,
119 values: expr,
120 returning,
121 }))
122}
123
124pub fn describe_delete(
125 scx: &StatementContext,
126 stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128 query::plan_delete_query(scx, stmt)?;
129 Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133 scx: &StatementContext,
134 stmt: DeleteStatement<Aug>,
135 params: &Params,
136) -> Result<Plan, PlanError> {
137 let rtw_plan = query::plan_delete_query(scx, stmt)?;
138 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142 scx: &StatementContext,
143 stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145 query::plan_update_query(scx, stmt)?;
146 Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150 scx: &StatementContext,
151 stmt: UpdateStatement<Aug>,
152 params: &Params,
153) -> Result<Plan, PlanError> {
154 let rtw_plan = query::plan_update_query(scx, stmt)?;
155 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159 scx: &StatementContext,
160 kind: MutationKind,
161 params: &Params,
162 query::ReadThenWritePlan {
163 id,
164 mut selection,
165 finishing,
166 assignments,
167 }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169 selection.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
170 let mut assignments_outer = BTreeMap::new();
171 for (idx, mut set) in assignments {
172 set.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
173 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
174 assignments_outer.insert(idx, set);
175 }
176
177 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178 id,
179 selection,
180 finishing,
181 assignments: assignments_outer,
182 kind,
183 returning: Vec::new(),
184 }))
185}
186
187pub fn describe_select(
188 scx: &StatementContext,
189 stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192 return Ok(StatementDesc::new(Some(desc)));
193 }
194
195 let query::PlannedRootQuery { desc, .. } =
196 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197 Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201 scx: &StatementContext,
202 select: SelectStatement<Aug>,
203 params: &Params,
204 copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207 return Ok(Plan::SideEffectingFunc(f));
208 }
209
210 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211 Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215 scx: &StatementContext,
216 select: SelectStatement<Aug>,
217 params: &Params,
218 copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220 let when = query::plan_as_of(scx, select.as_of.clone())?;
221 let lifetime = QueryLifetime::OneShot;
222 let query::PlannedRootQuery {
223 mut expr,
224 desc,
225 finishing,
226 scope: _,
227 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228 expr.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
229
230 let limit = match finishing.limit {
236 None => None,
237 Some(mut limit) => {
238 limit.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
239 let Some(limit) = limit.as_literal() else {
241 sql_bail!(
242 "Top-level LIMIT must be a constant expression, got {}",
243 limit
244 )
245 };
246 match limit {
247 Datum::Null => None,
248 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
249 _ => {
250 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
251 sql_bail!("LIMIT must be a non-negative INT or NULL")
252 }
253 }
254 }
255 };
256 let offset = {
257 let mut offset = finishing.offset.clone();
258 offset.bind_parameters_and_simplify_offset(scx, lifetime, params)?;
259 let offset = offset_into_value(offset.take())?;
260 offset.try_into().map_err(|_| {
261 soft_panic_or_log!("unexpectedly negative OFFSET");
263 negative_offset_error(offset)
264 })?
265 };
266
267 if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
271 && select.as_of.is_some()
272 && expr.contains_unmaterializable_except_temporal()
273 {
274 bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
275 }
276
277 let plan = SelectPlan {
278 source: expr,
279 when,
280 finishing: RowSetFinishing {
281 limit,
282 offset,
283 project: finishing.project,
284 order_by: finishing.order_by,
285 },
286 copy_to,
287 select: Some(Box::new(select)),
288 };
289
290 Ok((plan, desc))
291}
292
293pub fn describe_explain_plan(
294 scx: &StatementContext,
295 explain: ExplainPlanStatement<Aug>,
296) -> Result<StatementDesc, PlanError> {
297 let mut relation_desc = RelationDesc::builder();
298
299 match explain.stage() {
300 ExplainStage::RawPlan => {
301 let name = "Raw Plan";
302 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
303 }
304 ExplainStage::DecorrelatedPlan => {
305 let name = "Decorrelated Plan";
306 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
307 }
308 ExplainStage::LocalPlan => {
309 let name = "Locally Optimized Plan";
310 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
311 }
312 ExplainStage::GlobalPlan => {
313 let name = "Optimized Plan";
314 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
315 }
316 ExplainStage::PhysicalPlan => {
317 let name = "Physical Plan";
318 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
319 }
320 ExplainStage::Trace => {
321 relation_desc = relation_desc
322 .with_column("Time", SqlScalarType::UInt64.nullable(false))
323 .with_column("Path", SqlScalarType::String.nullable(false))
324 .with_column("Plan", SqlScalarType::String.nullable(false));
325 }
326 ExplainStage::PlanInsights => {
327 let name = "Plan Insights";
328 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
329 }
330 };
331 let relation_desc = relation_desc.finish();
332
333 Ok(
334 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
335 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
336 _ => vec![],
337 }),
338 )
339}
340
341pub fn describe_explain_pushdown(
342 scx: &StatementContext,
343 statement: ExplainPushdownStatement<Aug>,
344) -> Result<StatementDesc, PlanError> {
345 let relation_desc = RelationDesc::builder()
346 .with_column("Source", SqlScalarType::String.nullable(false))
347 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
348 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
349 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
350 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
351 .finish();
352
353 Ok(
354 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
355 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
356 _ => vec![],
357 }),
358 )
359}
360
361pub fn describe_explain_analyze_object(
362 _scx: &StatementContext,
363 statement: ExplainAnalyzeObjectStatement<Aug>,
364) -> Result<StatementDesc, PlanError> {
365 if statement.as_sql {
366 let relation_desc = RelationDesc::builder()
367 .with_column("SQL", SqlScalarType::String.nullable(false))
368 .finish();
369 return Ok(StatementDesc::new(Some(relation_desc)));
370 }
371
372 match statement.properties {
373 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
374 properties,
375 skew,
376 }) => {
377 let mut relation_desc = RelationDesc::builder()
378 .with_column("operator", SqlScalarType::String.nullable(false));
379
380 if skew {
381 relation_desc =
382 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
383 }
384
385 let mut seen_properties = BTreeSet::new();
386 for property in properties {
387 if !seen_properties.insert(property) {
389 continue;
390 }
391
392 match property {
393 ExplainAnalyzeComputationProperty::Memory if skew => {
394 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
395 relation_desc = relation_desc
396 .with_column("memory_ratio", numeric.clone())
397 .with_column("worker_memory", SqlScalarType::String.nullable(true))
398 .with_column("avg_memory", SqlScalarType::String.nullable(true))
399 .with_column("total_memory", SqlScalarType::String.nullable(true))
400 .with_column("records_ratio", numeric.clone())
401 .with_column("worker_records", numeric.clone())
402 .with_column("avg_records", numeric.clone())
403 .with_column("total_records", numeric);
404 }
405 ExplainAnalyzeComputationProperty::Memory => {
406 relation_desc = relation_desc
407 .with_column("total_memory", SqlScalarType::String.nullable(true))
408 .with_column(
409 "total_records",
410 SqlScalarType::Numeric { max_scale: None }.nullable(true),
411 );
412 }
413 ExplainAnalyzeComputationProperty::Cpu => {
414 if skew {
415 relation_desc = relation_desc
416 .with_column(
417 "cpu_ratio",
418 SqlScalarType::Numeric { max_scale: None }.nullable(true),
419 )
420 .with_column(
421 "worker_elapsed",
422 SqlScalarType::Interval.nullable(true),
423 )
424 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
425 }
426 relation_desc = relation_desc
427 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
428 }
429 }
430 }
431
432 let relation_desc = relation_desc.finish();
433 Ok(StatementDesc::new(Some(relation_desc)))
434 }
435 ExplainAnalyzeProperty::Hints => {
436 let relation_desc = RelationDesc::builder()
437 .with_column("operator", SqlScalarType::String.nullable(true))
438 .with_column("levels", SqlScalarType::Int64.nullable(true))
439 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
440 .with_column("hint", SqlScalarType::Float64.nullable(true))
441 .with_column("savings", SqlScalarType::String.nullable(true))
442 .finish();
443 Ok(StatementDesc::new(Some(relation_desc)))
444 }
445 }
446}
447
448pub fn describe_explain_analyze_cluster(
449 _scx: &StatementContext,
450 statement: ExplainAnalyzeClusterStatement,
451) -> Result<StatementDesc, PlanError> {
452 if statement.as_sql {
453 let relation_desc = RelationDesc::builder()
454 .with_column("SQL", SqlScalarType::String.nullable(false))
455 .finish();
456 return Ok(StatementDesc::new(Some(relation_desc)));
457 }
458
459 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
460
461 let mut relation_desc = RelationDesc::builder()
462 .with_column("object", SqlScalarType::String.nullable(false))
463 .with_column("global_id", SqlScalarType::String.nullable(false));
464
465 if skew {
466 relation_desc =
467 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
468 }
469
470 let mut seen_properties = BTreeSet::new();
471 for property in properties {
472 if !seen_properties.insert(property) {
474 continue;
475 }
476
477 match property {
478 ExplainAnalyzeComputationProperty::Memory if skew => {
479 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
480 relation_desc = relation_desc
481 .with_column("max_operator_memory_ratio", numeric.clone())
482 .with_column("worker_memory", SqlScalarType::String.nullable(true))
483 .with_column("avg_memory", SqlScalarType::String.nullable(true))
484 .with_column("total_memory", SqlScalarType::String.nullable(true))
485 .with_column("max_operator_records_ratio", numeric.clone())
486 .with_column("worker_records", numeric.clone())
487 .with_column("avg_records", numeric.clone())
488 .with_column("total_records", numeric);
489 }
490 ExplainAnalyzeComputationProperty::Memory => {
491 relation_desc = relation_desc
492 .with_column("total_memory", SqlScalarType::String.nullable(true))
493 .with_column(
494 "total_records",
495 SqlScalarType::Numeric { max_scale: None }.nullable(true),
496 );
497 }
498 ExplainAnalyzeComputationProperty::Cpu if skew => {
499 relation_desc = relation_desc
500 .with_column(
501 "max_operator_cpu_ratio",
502 SqlScalarType::Numeric { max_scale: None }.nullable(true),
503 )
504 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
505 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
506 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
507 }
508 ExplainAnalyzeComputationProperty::Cpu => {
509 relation_desc = relation_desc
510 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
511 }
512 }
513 }
514
515 Ok(StatementDesc::new(Some(relation_desc.finish())))
516}
517
518pub fn describe_explain_timestamp(
519 scx: &StatementContext,
520 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522 let relation_desc = RelationDesc::builder()
523 .with_column("Timestamp", SqlScalarType::String.nullable(false))
524 .finish();
525
526 Ok(StatementDesc::new(Some(relation_desc))
527 .with_params(describe_select(scx, select)?.param_types))
528}
529
530pub fn describe_explain_schema(
531 _: &StatementContext,
532 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
533) -> Result<StatementDesc, PlanError> {
534 let relation_desc = RelationDesc::builder()
535 .with_column("Schema", SqlScalarType::String.nullable(false))
536 .finish();
537 Ok(StatementDesc::new(Some(relation_desc)))
538}
539
540generate_extracted_config!(
549 ExplainPlanOption,
550 (Arity, Option<bool>, Default(None)),
551 (Cardinality, bool, Default(false)),
552 (ColumnNames, bool, Default(false)),
553 (FilterPushdown, Option<bool>, Default(None)),
554 (HumanizedExpressions, Option<bool>, Default(None)),
555 (JoinImplementations, bool, Default(false)),
556 (Keys, bool, Default(false)),
557 (LinearChains, bool, Default(false)),
558 (NoFastPath, bool, Default(false)),
559 (NonNegative, bool, Default(false)),
560 (NoNotices, bool, Default(false)),
561 (NodeIdentifiers, bool, Default(false)),
562 (Raw, bool, Default(false)),
563 (RawPlans, bool, Default(false)),
564 (RawSyntax, bool, Default(false)),
565 (Redacted, bool, Default(false)),
566 (SubtreeSize, bool, Default(false)),
567 (Timing, bool, Default(false)),
568 (Types, bool, Default(false)),
569 (Equivalences, bool, Default(false)),
570 (ReoptimizeImportedViews, Option<bool>, Default(None)),
571 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
572 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
573 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
574 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
575 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
576 (
577 EnableProjectionPushdownAfterRelationCse,
578 Option<bool>,
579 Default(None)
580 )
581);
582
583impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
584 type Error = PlanError;
585
586 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
587 if v.raw {
590 v.raw_plans = true;
591 v.raw_syntax = true;
592 }
593
594 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
597
598 Ok(ExplainConfig {
599 arity: v.arity.unwrap_or(enable_on_prod),
600 cardinality: v.cardinality,
601 column_names: v.column_names,
602 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
603 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
604 join_impls: v.join_implementations,
605 keys: v.keys,
606 linear_chains: !v.raw_plans && v.linear_chains,
607 no_fast_path: v.no_fast_path,
608 no_notices: v.no_notices,
609 node_ids: v.node_identifiers,
610 non_negative: v.non_negative,
611 raw_plans: v.raw_plans,
612 raw_syntax: v.raw_syntax,
613 verbose_syntax: false,
614 redacted: v.redacted,
615 subtree_size: v.subtree_size,
616 equivalences: v.equivalences,
617 timing: v.timing,
618 types: v.types,
619 features: OptimizerFeatureOverrides {
621 enable_eager_delta_joins: v.enable_eager_delta_joins,
622 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
623 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
624 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
625 enable_reduce_mfp_fusion: Default::default(),
626 enable_cardinality_estimates: Default::default(),
627 persist_fast_path_limit: Default::default(),
628 reoptimize_imported_views: v.reoptimize_imported_views,
629 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
630 enable_projection_pushdown_after_relation_cse: v
631 .enable_projection_pushdown_after_relation_cse,
632 enable_less_reduce_in_eqprop: Default::default(),
633 enable_dequadratic_eqprop_map: Default::default(),
634 enable_eq_classes_withholding_errors: Default::default(),
635 enable_fast_path_plan_insights: Default::default(),
636 enable_cast_elimination: Default::default(),
637 enable_case_literal_transform: Default::default(),
638 enable_simplify_quantified_comparisons: Default::default(),
639 enable_coalesce_case_transform: Default::default(),
640 enable_will_distinct_propagation: Default::default(),
641 },
642 })
643 }
644}
645
646fn plan_explainee(
647 scx: &StatementContext,
648 explainee: Explainee<Aug>,
649 params: &Params,
650) -> Result<plan::Explainee, PlanError> {
651 use crate::plan::ExplaineeStatement;
652
653 let is_replan = matches!(
654 explainee,
655 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
656 );
657
658 let explainee = match explainee {
659 Explainee::View(name) | Explainee::ReplanView(name) => {
660 let item = scx.get_item_by_resolved_name(&name)?;
661 let item_type = item.item_type();
662 if item_type != CatalogItemType::View {
663 sql_bail!("Expected {name} to be a view, not a {item_type}");
664 }
665 match is_replan {
666 true => crate::plan::Explainee::ReplanView(item.id()),
667 false => crate::plan::Explainee::View(item.id()),
668 }
669 }
670 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
671 let item = scx.get_item_by_resolved_name(&name)?;
672 let item_type = item.item_type();
673 if item_type != CatalogItemType::MaterializedView {
674 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
675 }
676 match is_replan {
677 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
678 false => crate::plan::Explainee::MaterializedView(item.id()),
679 }
680 }
681 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
682 let item = scx.get_item_by_resolved_name(&name)?;
683 let item_type = item.item_type();
684 if item_type != CatalogItemType::Index {
685 sql_bail!("Expected {name} to be an index, not a {item_type}");
686 }
687 match is_replan {
688 true => crate::plan::Explainee::ReplanIndex(item.id()),
689 false => crate::plan::Explainee::Index(item.id()),
690 }
691 }
692 Explainee::Select(select, broken) => {
693 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
694 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
695 }
696 Explainee::CreateView(mut stmt, broken) => {
697 if stmt.if_exists != IfExistsBehavior::Skip {
698 stmt.if_exists = IfExistsBehavior::Skip;
703 } else {
704 sql_bail!(
705 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
706 (the behavior is implied within the scope of an enclosing EXPLAIN)"
707 );
708 }
709
710 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
711 sql_bail!("expected CreateViewPlan plan");
712 };
713
714 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
715 }
716 Explainee::CreateMaterializedView(mut stmt, broken) => {
717 if stmt.if_exists != IfExistsBehavior::Skip {
718 stmt.if_exists = IfExistsBehavior::Skip;
723 } else {
724 sql_bail!(
725 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
726 (the behavior is implied within the scope of an enclosing EXPLAIN)"
727 );
728 }
729
730 let Plan::CreateMaterializedView(plan) =
731 ddl::plan_create_materialized_view(scx, *stmt)?
732 else {
733 sql_bail!("expected CreateMaterializedViewPlan plan");
734 };
735
736 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
737 broken,
738 plan,
739 })
740 }
741 Explainee::CreateIndex(mut stmt, broken) => {
742 if !stmt.if_not_exists {
743 stmt.if_not_exists = true;
746 } else {
747 sql_bail!(
748 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
749 (the behavior is implied within the scope of an enclosing EXPLAIN)"
750 );
751 }
752
753 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
754 sql_bail!("expected CreateIndexPlan plan");
755 };
756
757 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
758 }
759 Explainee::Subscribe(stmt, broken) => {
760 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
761 sql_bail!("expected SubscribePlan");
762 };
763 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
764 }
765 };
766
767 Ok(explainee)
768}
769
770pub fn plan_explain_plan(
771 scx: &StatementContext,
772 explain: ExplainPlanStatement<Aug>,
773 params: &Params,
774) -> Result<Plan, PlanError> {
775 let (format, verbose_syntax) = match explain.format() {
776 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
777 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
778 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
779 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
780 };
781 let stage = explain.stage();
782
783 let mut config = {
785 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
786
787 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
788 with_options.filter_pushdown = Some(false);
790 }
791
792 ExplainConfig::try_from(with_options)?
793 };
794 config.verbose_syntax = verbose_syntax;
795
796 let explainee = plan_explainee(scx, explain.explainee, params)?;
797
798 Ok(Plan::ExplainPlan(ExplainPlanPlan {
799 stage,
800 format,
801 config,
802 explainee,
803 }))
804}
805
806pub fn plan_explain_schema(
807 scx: &StatementContext,
808 explain_schema: ExplainSinkSchemaStatement<Aug>,
809) -> Result<Plan, PlanError> {
810 let ExplainSinkSchemaStatement {
811 schema_for,
812 format: _,
814 mut statement,
815 } = explain_schema;
816
817 statement.name = Some(UnresolvedItemName::qualified(&[
821 ident!("mz_catalog"),
822 ident!("mz_explain_schema"),
823 ]));
824
825 crate::pure::purify_create_sink_avro_doc_on_options(
826 scx.catalog,
827 *statement.from.item_id(),
828 &mut statement.format,
829 )?;
830
831 match ddl::plan_create_sink(scx, statement)? {
832 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
833 StorageSinkConnection::Kafka(KafkaSinkConnection {
834 format:
835 KafkaSinkFormat {
836 key_format,
837 value_format:
838 KafkaSinkFormatType::Avro {
839 schema: value_schema,
840 ..
841 },
842 ..
843 },
844 ..
845 }) => {
846 let schema = match schema_for {
847 ExplainSinkSchemaFor::Key => key_format
848 .and_then(|f| match f {
849 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
850 _ => None,
851 })
852 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
853 ExplainSinkSchemaFor::Value => value_schema,
854 };
855
856 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
857 sink_from: sink.from,
858 json_schema: schema,
859 }))
860 }
861 _ => bail_unsupported!(
862 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
863 ),
864 },
865 _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
866 }
867}
868
869pub fn plan_explain_pushdown(
870 scx: &StatementContext,
871 statement: ExplainPushdownStatement<Aug>,
872 params: &Params,
873) -> Result<Plan, PlanError> {
874 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
875 let explainee = plan_explainee(scx, statement.explainee, params)?;
876 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
877}
878
879pub fn plan_explain_analyze_object(
880 scx: &StatementContext,
881 statement: ExplainAnalyzeObjectStatement<Aug>,
882 params: &Params,
883) -> Result<Plan, PlanError> {
884 let explainee_name = statement
885 .explainee
886 .name()
887 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
888 .full_name_str();
889 let explainee = plan_explainee(scx, statement.explainee, params)?;
890
891 let check_ownership = |item_id: &CatalogItemId, item_type: &str| -> Result<(), PlanError> {
892 if scx.catalog.restrict_to_user_objects() {
893 let item = scx.catalog.get_item(item_id);
894 if item.owner_id() != *scx.catalog.active_role_id() {
895 let full_name = scx.catalog.resolve_full_name(item.name());
896 return Err(sql_err!("must be owner of {item_type} {full_name}"));
897 }
898 }
899 Ok(())
900 };
901 match &explainee {
902 plan::Explainee::Index(item_id) => check_ownership(item_id, "INDEX")?,
903 plan::Explainee::MaterializedView(item_id) => {
904 check_ownership(item_id, "MATERIALIZED VIEW")?
905 }
906 _ => return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",)),
907 };
908
909 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
924 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
925 let mut predicates = vec![format!(
926 "mo.name = {}",
927 escaped_string_literal(&explainee_name)
928 )];
929 let mut order_by = vec!["mlm.lir_id DESC"];
930
931 match statement.properties {
932 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
933 properties,
934 skew,
935 }) => {
936 let mut worker_id = None;
937 let mut seen_properties = BTreeSet::new();
938 for property in properties {
939 if !seen_properties.insert(property) {
941 continue;
942 }
943
944 match property {
945 ExplainAnalyzeComputationProperty::Memory => {
946 ctes.push((
947 "summary_memory",
948 r#"
949 SELECT mlm.global_id AS global_id,
950 mlm.lir_id AS lir_id,
951 SUM(mas.size) AS total_memory,
952 SUM(mas.records) AS total_records,
953 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
954 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
955 FROM mz_introspection.mz_lir_mapping mlm
956 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
957 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
958 ON (mas.operator_id = valid_id)
959GROUP BY mlm.global_id, mlm.lir_id"#,
960 ));
961 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
962
963 if skew {
964 ctes.push((
965 "per_worker_memory",
966 r#"
967 SELECT mlm.global_id AS global_id,
968 mlm.lir_id AS lir_id,
969 mas.worker_id AS worker_id,
970 SUM(mas.size) AS worker_memory,
971 SUM(mas.records) AS worker_records
972 FROM mz_introspection.mz_lir_mapping mlm
973 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
974 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
975 ON (mas.operator_id = valid_id)
976GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
977 ));
978 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
979
980 if let Some(worker_id) = worker_id {
981 predicates.push(format!(
982 "(pwm.worker_id = {worker_id} OR pwm.worker_id IS NULL OR {worker_id} IS NULL)"
983 ));
984 } else {
985 worker_id = Some("pwm.worker_id");
986 columns.push("pwm.worker_id AS worker_id");
987 order_by.push("worker_id");
988 }
989
990 columns.extend([
991 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
992 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
993 "pg_size_pretty(sm.avg_memory) AS avg_memory",
994 "pg_size_pretty(sm.total_memory) AS total_memory",
995 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
996 "pwm.worker_records AS worker_records",
997 "sm.avg_records AS avg_records",
998 "sm.total_records AS total_records",
999 ]);
1000 } else {
1001 columns.extend([
1002 "pg_size_pretty(sm.total_memory) AS total_memory",
1003 "sm.total_records AS total_records",
1004 ]);
1005 }
1006 }
1007 ExplainAnalyzeComputationProperty::Cpu => {
1008 ctes.push((
1009 "summary_cpu",
1010 r#"
1011 SELECT mlm.global_id AS global_id,
1012 mlm.lir_id AS lir_id,
1013 SUM(mse.elapsed_ns) AS total_ns,
1014 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1015 FROM mz_introspection.mz_lir_mapping mlm
1016 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1017 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1018 ON (mse.id = valid_id)
1019GROUP BY mlm.global_id, mlm.lir_id"#,
1020 ));
1021 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1022
1023 if skew {
1024 ctes.push((
1025 "per_worker_cpu",
1026 r#"
1027 SELECT mlm.global_id AS global_id,
1028 mlm.lir_id AS lir_id,
1029 mse.worker_id AS worker_id,
1030 SUM(mse.elapsed_ns) AS worker_ns
1031 FROM mz_introspection.mz_lir_mapping mlm
1032 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1033 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1034 ON (mse.id = valid_id)
1035GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1036 ));
1037 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1038
1039 if let Some(worker_id) = worker_id {
1040 predicates.push(format!(
1041 "(pwc.worker_id = {worker_id} OR pwc.worker_id IS NULL OR {worker_id} IS NULL)"
1042 ));
1043 } else {
1044 worker_id = Some("pwc.worker_id");
1045 columns.push("pwc.worker_id AS worker_id");
1046 order_by.push("worker_id");
1047 }
1048
1049 columns.extend([
1050 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1051 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1052 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1053 ]);
1054 }
1055 columns.push(
1056 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1057 );
1058 }
1059 }
1060 }
1061 }
1062 ExplainAnalyzeProperty::Hints => {
1063 columns.extend([
1064 "megsa.levels AS levels",
1065 "megsa.to_cut AS to_cut",
1066 "megsa.hint AS hint",
1067 "pg_size_pretty(megsa.savings) AS savings",
1068 ]);
1069 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1070 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1071 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1072 }
1073 }
1074
1075 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1076
1077 let ctes = if !ctes.is_empty() {
1078 format!(
1079 "WITH {}",
1080 separated(
1081 ",\n",
1082 ctes.iter()
1083 .map(|(name, defn)| format!("{name} AS ({defn})"))
1084 )
1085 )
1086 } else {
1087 String::new()
1088 };
1089 let columns = separated(", ", columns);
1090 let from = separated(" ", from);
1091 let predicates = separated(" AND ", predicates);
1092 let order_by = separated(", ", order_by);
1093 let query = format!(
1094 r#"{ctes}
1095SELECT {columns}
1096FROM {from}
1097WHERE {predicates}
1098ORDER BY {order_by}"#
1099 );
1100
1101 if statement.as_sql {
1102 let rows = vec![Row::pack_slice(&[Datum::String(
1103 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1104 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1105 })?,
1106 )])];
1107 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1108
1109 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1110 } else {
1111 let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1112 scx.record_sql_impl_ids(&resolved_ids);
1113 show_select.plan()
1114 }
1115}
1116
1117pub fn plan_explain_analyze_cluster(
1118 scx: &StatementContext,
1119 statement: ExplainAnalyzeClusterStatement,
1120 _params: &Params,
1121) -> Result<Plan, PlanError> {
1122 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1147 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1148 let mut predicates = vec![];
1149 let mut order_by = vec![];
1150
1151 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1152 let mut worker_id = None;
1153 let mut seen_properties = BTreeSet::new();
1154 for property in properties {
1155 if !seen_properties.insert(property) {
1157 continue;
1158 }
1159
1160 match property {
1161 ExplainAnalyzeComputationProperty::Memory => {
1162 if skew {
1163 let mut set_worker_id = false;
1164 if let Some(worker_id) = worker_id {
1165 predicates.push(format!(
1167 "(om.worker_id = {worker_id} OR om.worker_id IS NULL OR {worker_id} IS NULL)"
1168 ));
1169 } else {
1170 worker_id = Some("om.worker_id");
1171 columns.push("om.worker_id AS worker_id");
1172 set_worker_id = true; };
1174
1175 ctes.push((
1177 "per_operator_memory_summary",
1178 r#"
1179SELECT mlm.global_id AS global_id,
1180 mlm.lir_id AS lir_id,
1181 SUM(mas.size) AS total_memory,
1182 SUM(mas.records) AS total_records,
1183 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1184 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1185FROM mz_introspection.mz_lir_mapping mlm
1186 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1187 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1188 ON (mas.operator_id = valid_id)
1189GROUP BY mlm.global_id, mlm.lir_id"#,
1190 ));
1191
1192 ctes.push((
1194 "per_operator_memory_per_worker",
1195 r#"
1196SELECT mlm.global_id AS global_id,
1197 mlm.lir_id AS lir_id,
1198 mas.worker_id AS worker_id,
1199 SUM(mas.size) AS worker_memory,
1200 SUM(mas.records) AS worker_records
1201FROM mz_introspection.mz_lir_mapping mlm
1202 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1203 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1204 ON (mas.operator_id = valid_id)
1205GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1206 ));
1207
1208 ctes.push((
1210 "per_operator_memory_ratios",
1211 r#"
1212SELECT pompw.global_id AS global_id,
1213 pompw.lir_id AS lir_id,
1214 pompw.worker_id AS worker_id,
1215 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1216 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1217 FROM per_operator_memory_per_worker pompw
1218 JOIN per_operator_memory_summary poms
1219 USING (global_id, lir_id)
1220"#,
1221 ));
1222
1223 ctes.push((
1225 "object_memory",
1226 r#"
1227SELECT pompw.global_id AS global_id,
1228 pompw.worker_id AS worker_id,
1229 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1230 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1231 SUM(pompw.worker_memory) AS worker_memory,
1232 SUM(pompw.worker_records) AS worker_records
1233FROM per_operator_memory_per_worker pompw
1234 JOIN per_operator_memory_ratios pomr
1235 USING (global_id, worker_id, lir_id)
1236GROUP BY pompw.global_id, pompw.worker_id
1237"#,
1238 ));
1239
1240 ctes.push(("object_average_memory", r#"
1242SELECT om.global_id AS global_id,
1243 SUM(om.worker_memory) AS total_memory,
1244 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1245 SUM(om.worker_records) AS total_records,
1246 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1247 FROM object_memory om
1248GROUP BY om.global_id"#));
1249
1250 from.push("LEFT JOIN object_memory om USING (global_id)");
1251 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1252
1253 columns.extend([
1254 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1255 "pg_size_pretty(om.worker_memory) AS worker_memory",
1256 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1257 "pg_size_pretty(oam.total_memory) AS total_memory",
1258 "om.max_operator_records_ratio AS max_operator_records_ratio",
1259 "om.worker_records AS worker_records",
1260 "oam.avg_records AS avg_records",
1261 "oam.total_records AS total_records",
1262 ]);
1263
1264 order_by.extend([
1265 "max_operator_memory_ratio DESC NULLS LAST",
1266 "max_operator_records_ratio DESC NULLS LAST",
1267 "om.worker_memory DESC NULLS LAST",
1268 "worker_records DESC NULLS LAST",
1269 ]);
1270
1271 if set_worker_id {
1272 order_by.push("worker_id");
1273 }
1274 } else {
1275 ctes.push((
1277 "per_operator_memory_totals",
1278 r#"
1279 SELECT mlm.global_id AS global_id,
1280 mlm.lir_id AS lir_id,
1281 SUM(mas.size) AS total_memory,
1282 SUM(mas.records) AS total_records
1283 FROM mz_introspection.mz_lir_mapping mlm
1284 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1285 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1286 ON (mas.operator_id = valid_id)
1287 GROUP BY mlm.global_id, mlm.lir_id"#,
1288 ));
1289
1290 ctes.push((
1291 "object_memory_totals",
1292 r#"
1293SELECT pomt.global_id AS global_id,
1294 SUM(pomt.total_memory) AS total_memory,
1295 SUM(pomt.total_records) AS total_records
1296FROM per_operator_memory_totals pomt
1297GROUP BY pomt.global_id
1298"#,
1299 ));
1300
1301 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1302 columns.extend([
1303 "pg_size_pretty(omt.total_memory) AS total_memory",
1304 "omt.total_records AS total_records",
1305 ]);
1306 order_by.extend([
1307 "omt.total_memory DESC NULLS LAST",
1308 "total_records DESC NULLS LAST",
1309 ]);
1310 }
1311 }
1312 ExplainAnalyzeComputationProperty::Cpu => {
1313 if skew {
1314 let mut set_worker_id = false;
1315 if let Some(worker_id) = worker_id {
1316 predicates.push(format!(
1318 "(oc.worker_id = {worker_id} OR oc.worker_id IS NULL OR {worker_id} IS NULL)"
1319 ));
1320 } else {
1321 worker_id = Some("oc.worker_id");
1322 columns.push("oc.worker_id AS worker_id");
1323 set_worker_id = true; };
1325
1326 ctes.push((
1328 "per_operator_cpu_summary",
1329 r#"
1330SELECT mlm.global_id AS global_id,
1331 mlm.lir_id AS lir_id,
1332 SUM(mse.elapsed_ns) AS total_ns,
1333 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1334FROM mz_introspection.mz_lir_mapping mlm
1335CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1336 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1337 ON (mse.id = valid_id)
1338GROUP BY mlm.global_id, mlm.lir_id"#,
1339));
1340
1341 ctes.push((
1343 "per_operator_cpu_per_worker",
1344 r#"
1345SELECT mlm.global_id AS global_id,
1346 mlm.lir_id AS lir_id,
1347 mse.worker_id AS worker_id,
1348 SUM(mse.elapsed_ns) AS worker_ns
1349FROM mz_introspection.mz_lir_mapping mlm
1350CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1351 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1352 ON (mse.id = valid_id)
1353GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1354 ));
1355
1356 ctes.push((
1358 "per_operator_cpu_ratios",
1359 r#"
1360SELECT pocpw.global_id AS global_id,
1361 pocpw.lir_id AS lir_id,
1362 pocpw.worker_id AS worker_id,
1363 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1364FROM per_operator_cpu_per_worker pocpw
1365 JOIN per_operator_cpu_summary pocs
1366 USING (global_id, lir_id)
1367"#,
1368 ));
1369
1370 ctes.push((
1372 "object_cpu",
1373 r#"
1374SELECT pocpw.global_id AS global_id,
1375 pocpw.worker_id AS worker_id,
1376 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1377 SUM(pocpw.worker_ns) AS worker_ns
1378FROM per_operator_cpu_per_worker pocpw
1379 JOIN per_operator_cpu_ratios pomr
1380 USING (global_id, worker_id, lir_id)
1381GROUP BY pocpw.global_id, pocpw.worker_id
1382"#,
1383 ));
1384
1385 ctes.push((
1387 "object_average_cpu",
1388 r#"
1389SELECT oc.global_id AS global_id,
1390 SUM(oc.worker_ns) AS total_ns,
1391 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1392 FROM object_cpu oc
1393GROUP BY oc.global_id"#,));
1394
1395 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1396 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1397
1398 columns.extend([
1399 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1400 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1401 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1402 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1403 ]);
1404
1405 order_by.extend([
1406 "max_operator_cpu_ratio DESC NULLS LAST",
1407 "worker_elapsed DESC NULLS LAST",
1408 ]);
1409
1410 if set_worker_id {
1411 order_by.push("worker_id");
1412 }
1413 } else {
1414 ctes.push((
1416 "per_operator_cpu_totals",
1417 r#"
1418 SELECT mlm.global_id AS global_id,
1419 mlm.lir_id AS lir_id,
1420 SUM(mse.elapsed_ns) AS total_ns
1421 FROM mz_introspection.mz_lir_mapping mlm
1422 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1423 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1424 ON (mse.id = valid_id)
1425 GROUP BY mlm.global_id, mlm.lir_id"#,
1426 ));
1427
1428 ctes.push((
1429 "object_cpu_totals",
1430 r#"
1431SELECT poct.global_id AS global_id,
1432 SUM(poct.total_ns) AS total_ns
1433FROM per_operator_cpu_totals poct
1434GROUP BY poct.global_id
1435"#,
1436 ));
1437
1438 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1439 columns
1440 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1441 order_by.extend(["total_elapsed DESC NULLS LAST"]);
1442 }
1443 }
1444 }
1445 }
1446
1447 let ctes = if !ctes.is_empty() {
1449 format!(
1450 "WITH {}",
1451 separated(
1452 ",\n",
1453 ctes.iter()
1454 .map(|(name, defn)| format!("{name} AS ({defn})"))
1455 )
1456 )
1457 } else {
1458 String::new()
1459 };
1460 let columns = separated(", ", columns);
1461 let from = separated(" ", from);
1462 let predicates = if !predicates.is_empty() {
1463 format!("WHERE {}", separated(" AND ", predicates))
1464 } else {
1465 String::new()
1466 };
1467 order_by.push("mo.name DESC");
1469 let order_by = separated(", ", order_by);
1470 let query = format!(
1471 r#"{ctes}
1472SELECT {columns}
1473FROM {from}
1474{predicates}
1475ORDER BY {order_by}"#
1476 );
1477
1478 if statement.as_sql {
1479 let rows = vec![Row::pack_slice(&[Datum::String(
1480 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1481 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1482 })?,
1483 )])];
1484 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1485
1486 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1487 } else {
1488 let (show_select, resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1489 scx.record_sql_impl_ids(&resolved_ids);
1490 show_select.plan()
1491 }
1492}
1493
1494pub fn plan_explain_timestamp(
1495 scx: &StatementContext,
1496 explain: ExplainTimestampStatement<Aug>,
1497) -> Result<Plan, PlanError> {
1498 let (format, _verbose_syntax) = match explain.format() {
1499 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1500 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1501 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1502 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1503 };
1504
1505 let raw_plan = {
1506 let query::PlannedRootQuery {
1507 expr: raw_plan,
1508 desc: _,
1509 finishing: _,
1510 scope: _,
1511 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1512 if raw_plan.contains_parameters()? {
1513 return Err(PlanError::ParameterNotAllowed(
1514 "EXPLAIN TIMESTAMP".to_string(),
1515 ));
1516 }
1517
1518 raw_plan
1519 };
1520 let when = query::plan_as_of(scx, explain.select.as_of)?;
1521
1522 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1523 format,
1524 raw_plan,
1525 when,
1526 }))
1527}
1528
1529generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1530
1531pub fn describe_subscribe(
1532 scx: &StatementContext,
1533 stmt: SubscribeStatement<Aug>,
1534) -> Result<StatementDesc, PlanError> {
1535 let relation_desc = match stmt.relation {
1536 SubscribeRelation::Name(name) => {
1537 let item = scx.get_item_by_resolved_name(&name)?;
1538 match item.relation_desc() {
1539 Some(desc) => desc.into_owned(),
1540 None => sql_bail!(
1541 "'{}' cannot be subscribed to because it is a {}",
1542 name.full_name_str(),
1543 item.item_type(),
1544 ),
1545 }
1546 }
1547 SubscribeRelation::Query(query) => {
1548 let query::PlannedRootQuery { desc, .. } =
1549 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1550 desc
1551 }
1552 };
1553 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1554 let progress = progress.unwrap_or(false);
1555 let mut desc = RelationDesc::builder().with_column(
1556 "mz_timestamp",
1557 SqlScalarType::Numeric {
1558 max_scale: Some(NumericMaxScale::ZERO),
1559 }
1560 .nullable(false),
1561 );
1562 if progress {
1563 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1564 }
1565
1566 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1567 match stmt.output {
1568 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1569 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1570 for (name, mut ty) in relation_desc.into_iter() {
1571 if progress {
1572 ty.nullable = true;
1573 }
1574 desc = desc.with_column(name, ty);
1575 }
1576 }
1577 SubscribeOutput::EnvelopeUpsert { key_columns }
1578 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1579 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1580 let key_columns = key_columns
1581 .into_iter()
1582 .map(normalize::column_name)
1583 .collect_vec();
1584 let mut before_values_desc = RelationDesc::builder();
1585 let mut after_values_desc = RelationDesc::builder();
1586
1587 for column_name in &key_columns {
1589 let mut column_ty = relation_desc
1590 .get_by_name(column_name)
1591 .map(|(_pos, ty)| ty.clone())
1592 .ok_or_else(|| PlanError::UnknownColumn {
1593 table: None,
1594 column: column_name.clone(),
1595 similar: Box::new([]),
1596 })?;
1597 if progress {
1598 column_ty.nullable = true;
1599 }
1600 desc = desc.with_column(column_name, column_ty);
1601 }
1602
1603 for (mut name, mut ty) in relation_desc
1606 .into_iter()
1607 .filter(|(name, _ty)| !key_columns.contains(name))
1608 {
1609 ty.nullable = true;
1610 before_values_desc =
1611 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1612 if debezium {
1613 name = format!("after_{}", name).into();
1614 }
1615 after_values_desc = after_values_desc.with_column(name, ty);
1616 }
1617
1618 if debezium {
1619 desc = desc.concat(before_values_desc);
1620 }
1621 desc = desc.concat(after_values_desc);
1622 }
1623 }
1624 Ok(StatementDesc::new(Some(desc.finish())))
1625}
1626
1627pub fn plan_subscribe(
1628 scx: &StatementContext,
1629 SubscribeStatement {
1630 relation,
1631 options,
1632 as_of,
1633 up_to,
1634 output,
1635 }: SubscribeStatement<Aug>,
1636 params: &Params,
1637 copy_to: Option<CopyFormat>,
1638) -> Result<Plan, PlanError> {
1639 let (from, desc, scope) = match relation {
1640 SubscribeRelation::Name(name) => {
1641 let item = scx.get_item_by_resolved_name(&name)?;
1642 let Some(desc) = item.relation_desc() else {
1643 sql_bail!(
1644 "'{}' cannot be subscribed to because it is a {}",
1645 name.full_name_str(),
1646 item.item_type(),
1647 );
1648 };
1649 let item_name = match name {
1650 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1651 _ => None,
1652 };
1653 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1654 (
1655 SubscribeFrom::Id(item.global_id()),
1656 desc.into_owned(),
1657 scope,
1658 )
1659 }
1660 SubscribeRelation::Query(query) => {
1661 #[allow(deprecated)] let query::PlannedRootQuery {
1663 mut expr,
1664 desc,
1665 finishing,
1666 scope,
1667 } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1668 expr.bind_parameters_and_simplify_offset(scx, QueryLifetime::Subscribe, params)?;
1669 let query = query::PlannedRootQuery {
1670 expr,
1671 desc,
1672 finishing,
1673 scope,
1674 };
1675 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1679 &query.finishing,
1680 query.desc.arity()
1681 ));
1682 let desc = query.desc.clone();
1683 (
1684 SubscribeFrom::Query {
1685 expr: query.expr,
1686 desc: query.desc,
1687 },
1688 desc,
1689 query.scope,
1690 )
1691 }
1692 };
1693
1694 let when = query::plan_as_of(scx, as_of)?;
1695 let up_to = up_to
1696 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1697 .transpose()?;
1698
1699 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1700 let ecx = ExprContext {
1701 qcx: &qcx,
1702 name: "",
1703 scope: &scope,
1704 relation_type: desc.typ(),
1705 allow_aggregates: false,
1706 allow_subqueries: true,
1707 allow_parameters: true,
1708 allow_windows: false,
1709 };
1710
1711 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1712 let output = match output {
1713 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1714 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1715 let order_by = key_columns
1716 .iter()
1717 .map(|ident| OrderByExpr {
1718 expr: Expr::Identifier(vec![ident.clone()]),
1719 asc: None,
1720 nulls_last: None,
1721 })
1722 .collect_vec();
1723 let (order_by, map_exprs) = query::plan_order_by_exprs(
1724 &ExprContext {
1725 name: "ENVELOPE UPSERT KEY clause",
1726 ..ecx
1727 },
1728 &order_by[..],
1729 &output_columns[..],
1730 )?;
1731 if !map_exprs.is_empty() {
1732 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1733 }
1734 check_distinct_key_columns(&order_by, &output_columns)?;
1735 plan::SubscribeOutput::EnvelopeUpsert {
1736 order_by_keys: order_by,
1737 }
1738 }
1739 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1740 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1741 let order_by = key_columns
1742 .iter()
1743 .map(|ident| OrderByExpr {
1744 expr: Expr::Identifier(vec![ident.clone()]),
1745 asc: None,
1746 nulls_last: None,
1747 })
1748 .collect_vec();
1749 let (order_by, map_exprs) = query::plan_order_by_exprs(
1750 &ExprContext {
1751 name: "ENVELOPE DEBEZIUM KEY clause",
1752 ..ecx
1753 },
1754 &order_by[..],
1755 &output_columns[..],
1756 )?;
1757 if !map_exprs.is_empty() {
1758 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1759 }
1760 check_distinct_key_columns(&order_by, &output_columns)?;
1761 plan::SubscribeOutput::EnvelopeDebezium {
1762 order_by_keys: order_by,
1763 }
1764 }
1765 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1766 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1767 let mz_diff = "mz_diff".into();
1768 let output_columns = std::iter::once((0, &mz_diff))
1769 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1770 .collect_vec();
1771 match query::plan_order_by_exprs(
1772 &ExprContext {
1773 name: "WITHIN TIMESTAMP ORDER BY clause",
1774 ..ecx
1775 },
1776 &order_by[..],
1777 &output_columns[..],
1778 ) {
1779 Err(PlanError::UnknownColumn {
1780 table: None,
1781 column,
1782 similar: _,
1783 }) if &column == &mz_diff => {
1784 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1787 }
1788 Err(e) => return Err(e),
1789 Ok((order_by, map_exprs)) => {
1790 if !map_exprs.is_empty() {
1791 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1792 }
1793
1794 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1795 }
1796 }
1797 }
1798 };
1799
1800 let SubscribeOptionExtracted {
1801 progress, snapshot, ..
1802 } = options.try_into()?;
1803 Ok(Plan::Subscribe(SubscribePlan {
1804 from,
1805 when,
1806 up_to,
1807 with_snapshot: snapshot.unwrap_or(true),
1808 copy_to,
1809 emit_progress: progress.unwrap_or(false),
1810 output,
1811 }))
1812}
1813
1814fn check_distinct_key_columns(
1817 order_by: &[ColumnOrder],
1818 output_columns: &[(usize, &mz_repr::ColumnName)],
1819) -> Result<(), PlanError> {
1820 let mut seen = BTreeSet::new();
1821 for co in order_by {
1822 if !seen.insert(co.column) {
1823 return Err(PlanError::DuplicateKeyColumnInSubscribeEnvelope {
1824 column_name: output_columns[co.column].1.to_string(),
1825 });
1826 }
1827 }
1828 Ok(())
1829}
1830
1831pub fn describe_copy_from_table(
1832 scx: &StatementContext,
1833 table_name: <Aug as AstInfo>::ItemName,
1834 columns: Vec<Ident>,
1835) -> Result<StatementDesc, PlanError> {
1836 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1837 Ok(StatementDesc::new(Some(desc)))
1838}
1839
1840pub fn describe_copy_item(
1841 scx: &StatementContext,
1842 object_name: <Aug as AstInfo>::ItemName,
1843 columns: Vec<Ident>,
1844) -> Result<StatementDesc, PlanError> {
1845 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1846 Ok(StatementDesc::new(Some(desc)))
1847}
1848
1849pub fn describe_copy(
1850 scx: &StatementContext,
1851 CopyStatement {
1852 relation,
1853 direction,
1854 ..
1855 }: CopyStatement<Aug>,
1856) -> Result<StatementDesc, PlanError> {
1857 Ok(match (relation, direction) {
1858 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1859 describe_copy_item(scx, name, columns)?
1860 }
1861 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1862 describe_copy_from_table(scx, name, columns)?
1863 }
1864 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1865 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1866 }
1867 .with_is_copy())
1868}
1869
1870fn plan_copy_to_expr(
1871 scx: &StatementContext,
1872 select_plan: SelectPlan,
1873 desc: RelationDesc,
1874 to: &Expr<Aug>,
1875 format: CopyFormat,
1876 options: CopyOptionExtracted,
1877) -> Result<Plan, PlanError> {
1878 let conn_id = match options.aws_connection {
1879 Some(conn_id) => CatalogItemId::from(conn_id),
1880 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1881 };
1882 let connection = scx.get_item(&conn_id).connection()?;
1883
1884 match connection {
1885 mz_storage_types::connections::Connection::Aws(_) => {}
1886 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1887 }
1888
1889 let format = match format {
1890 CopyFormat::Csv => {
1891 let quote = extract_byte_param_value(options.quote, "quote")?;
1892 let escape = extract_byte_param_value(options.escape, "escape")?;
1893 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1894 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1895 CopyCsvFormatParams::try_new(
1896 delimiter,
1897 quote,
1898 escape,
1899 options.header,
1900 options.null,
1901 )
1902 .map_err(|e| sql_err!("{}", e))?,
1903 ))
1904 }
1905 CopyFormat::Parquet => {
1906 ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1909 .map_err(|e| sql_err!("{}", e))?;
1910 S3SinkFormat::Parquet
1911 }
1912 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1913 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1914 };
1915
1916 let mut to_expr = to.clone();
1918 transform_ast::transform(scx, &mut to_expr)?;
1919 let relation_type = RelationDesc::empty();
1920 let ecx = &ExprContext {
1921 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1922 name: "COPY TO target",
1923 scope: &Scope::empty(),
1924 relation_type: relation_type.typ(),
1925 allow_aggregates: false,
1926 allow_subqueries: false,
1927 allow_parameters: false,
1928 allow_windows: false,
1929 };
1930
1931 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1932
1933 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1934 sql_bail!(
1935 "MAX FILE SIZE cannot be less than {}",
1936 MIN_S3_SINK_FILE_SIZE
1937 );
1938 }
1939 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1940 sql_bail!(
1941 "MAX FILE SIZE cannot be greater than {}",
1942 MAX_S3_SINK_FILE_SIZE
1943 );
1944 }
1945
1946 Ok(Plan::CopyTo(CopyToPlan {
1947 select_plan,
1948 desc,
1949 to,
1950 connection: connection.to_owned(),
1951 connection_id: conn_id,
1952 format,
1953 max_file_size: options.max_file_size.as_bytes(),
1954 }))
1955}
1956
1957fn plan_copy_from(
1958 scx: &StatementContext,
1959 target: &CopyTarget<Aug>,
1960 table_name: ResolvedItemName,
1961 columns: Vec<Ident>,
1962 format: Option<CopyFormat>,
1963 options: CopyOptionExtracted,
1964) -> Result<Plan, PlanError> {
1965 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1966 match option {
1967 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1968 None => Ok(()),
1969 }
1970 }
1971
1972 let source = match target {
1973 CopyTarget::Stdin => CopyFromSource::Stdin,
1974 CopyTarget::Expr(from) => {
1975 let mut from_expr = from.clone();
1977 transform_ast::transform(scx, &mut from_expr)?;
1978 let relation_type = RelationDesc::empty();
1979 let ecx = &ExprContext {
1980 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1981 name: "COPY FROM target",
1982 scope: &Scope::empty(),
1983 relation_type: relation_type.typ(),
1984 allow_aggregates: false,
1985 allow_subqueries: false,
1986 allow_parameters: false,
1987 allow_windows: false,
1988 };
1989 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1990
1991 match options.aws_connection {
1992 Some(conn_id) => {
1993 let conn_id = CatalogItemId::from(conn_id);
1994
1995 let connection = match scx.get_item(&conn_id).connection()? {
1997 mz_storage_types::connections::Connection::Aws(conn) => conn,
1998 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1999 };
2000
2001 CopyFromSource::AwsS3 {
2002 uri: from,
2003 connection,
2004 connection_id: conn_id,
2005 }
2006 }
2007 None => CopyFromSource::Url(from),
2008 }
2009 }
2010 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
2011 };
2012
2013 let format = match &source {
2018 CopyFromSource::Stdin => format.unwrap_or(CopyFormat::Text),
2019 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => match format {
2020 None => sql_bail!("COPY FROM <expr> requires a FORMAT option"),
2021 Some(CopyFormat::Text) => bail_unsupported!("FORMAT TEXT"),
2022 Some(CopyFormat::Binary) => bail_unsupported!("FORMAT BINARY"),
2023 Some(format @ (CopyFormat::Csv | CopyFormat::Parquet)) => format,
2024 },
2025 };
2026
2027 let params = match format {
2028 CopyFormat::Text => {
2029 only_available_with_csv(options.quote, "quote")?;
2030 only_available_with_csv(options.escape, "escape")?;
2031 only_available_with_csv(options.header, "HEADER")?;
2032 let delimiter =
2033 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
2034 let null = match options.null {
2035 Some(null) => Cow::from(null),
2036 None => Cow::from("\\N"),
2037 };
2038 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
2039 }
2040 CopyFormat::Csv => {
2041 let quote = extract_byte_param_value(options.quote, "quote")?;
2042 let escape = extract_byte_param_value(options.escape, "escape")?;
2043 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2044 CopyFormatParams::Csv(
2045 CopyCsvFormatParams::try_new(
2046 delimiter,
2047 quote,
2048 escape,
2049 options.header,
2050 options.null,
2051 )
2052 .map_err(|e| sql_err!("{}", e))?,
2053 )
2054 }
2055 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2056 CopyFormat::Parquet => CopyFormatParams::Parquet,
2057 };
2058
2059 let filter = match (options.files, options.pattern) {
2060 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2061 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2062 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2063 (None, None) => None,
2064 };
2065
2066 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2067 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2068 }
2069
2070 let table_name_string = table_name.full_name_str();
2071
2072 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2073
2074 let Some(mfp) = maybe_mfp else {
2075 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2076 };
2077
2078 Ok(Plan::CopyFrom(CopyFromPlan {
2079 target_id: id,
2080 target_name: table_name_string,
2081 source,
2082 columns,
2083 source_desc,
2084 mfp,
2085 params,
2086 filter,
2087 }))
2088}
2089
2090fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2091 match v {
2092 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2093 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2094 None => Ok(None),
2095 }
2096}
2097
2098generate_extracted_config!(
2099 CopyOption,
2100 (Format, String),
2101 (Delimiter, String),
2102 (Null, String),
2103 (Escape, String),
2104 (Quote, String),
2105 (Header, bool),
2106 (AwsConnection, with_options::Object),
2107 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2108 (Files, Vec<String>),
2109 (Pattern, String)
2110);
2111
2112pub fn plan_copy(
2113 scx: &StatementContext,
2114 CopyStatement {
2115 relation,
2116 direction,
2117 target,
2118 options,
2119 }: CopyStatement<Aug>,
2120) -> Result<Plan, PlanError> {
2121 let options = CopyOptionExtracted::try_from(options)?;
2122 let format = options
2125 .format
2126 .as_ref()
2127 .map(|format| match format.to_lowercase().as_str() {
2128 "text" => Ok(CopyFormat::Text),
2129 "csv" => Ok(CopyFormat::Csv),
2130 "binary" => Ok(CopyFormat::Binary),
2131 "parquet" => Ok(CopyFormat::Parquet),
2132 _ => sql_bail!("unknown FORMAT: {}", format),
2133 })
2134 .transpose()?;
2135
2136 match (&direction, &target) {
2137 (CopyDirection::To, CopyTarget::Stdout) => {
2138 if options.delimiter.is_some() {
2139 sql_bail!("COPY TO does not support DELIMITER option yet");
2140 }
2141 if options.quote.is_some() {
2142 sql_bail!("COPY TO does not support QUOTE option yet");
2143 }
2144 if options.escape.is_some() {
2145 sql_bail!("COPY TO does not support ESCAPE option yet");
2146 }
2147 if options.null.is_some() {
2148 sql_bail!("COPY TO does not support NULL option yet");
2149 }
2150 if options.header == Some(true) {
2154 sql_bail!("COPY TO does not support HEADER option yet");
2155 }
2156 match relation {
2157 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2158 CopyRelation::Select(stmt) => Ok(plan_select(
2159 scx,
2160 stmt,
2161 &Params::empty(),
2162 Some(format.unwrap_or(CopyFormat::Text)),
2163 )?),
2164 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2165 scx,
2166 stmt,
2167 &Params::empty(),
2168 Some(format.unwrap_or(CopyFormat::Text)),
2169 )?),
2170 }
2171 }
2172 (CopyDirection::From, target) => match relation {
2173 CopyRelation::Named { name, columns } => {
2174 plan_copy_from(scx, target, name, columns, format, options)
2175 }
2176 _ => sql_bail!("COPY FROM {} not supported", target),
2177 },
2178 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2179 let format = match format {
2180 Some(inner) => inner,
2181 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2182 };
2183
2184 let stmt = match relation {
2185 CopyRelation::Named { name, columns } => {
2186 if !columns.is_empty() {
2187 sql_bail!(
2189 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2190 );
2191 }
2192 let query = Query {
2194 ctes: CteBlock::empty(),
2195 body: SetExpr::Table(name),
2196 order_by: vec![],
2197 limit: None,
2198 offset: None,
2199 };
2200 SelectStatement { query, as_of: None }
2201 }
2202 CopyRelation::Select(stmt) => {
2203 if !stmt.query.order_by.is_empty() {
2204 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2205 }
2206 stmt
2207 }
2208 CopyRelation::Subscribe(_) => {
2209 sql_bail!("COPY {} {} not supported", direction, target)
2210 }
2211 };
2212
2213 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2214 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2215 }
2216 _ => sql_bail!("COPY {} {} not supported", direction, target),
2217 }
2218}