1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_expr::visit::Visit;
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_ore::str::separated;
25use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
26use mz_repr::adt::numeric::NumericMaxScale;
27use mz_repr::bytes::ByteSize;
28use mz_repr::explain::{ExplainConfig, ExplainFormat};
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
31use mz_sql_parser::ast::{
32 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
33 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
34 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36 SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::{AstDisplay, escaped_string_literal};
45use crate::ast::{
46 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49 UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{
70 self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
71};
72
73pub fn describe_insert(
80 scx: &StatementContext,
81 InsertStatement {
82 table_name,
83 columns,
84 source,
85 returning,
86 }: InsertStatement<Aug>,
87) -> Result<StatementDesc, PlanError> {
88 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
89 let desc = if returning.expr.is_empty() {
90 None
91 } else {
92 Some(returning.desc)
93 };
94 Ok(StatementDesc::new(desc))
95}
96
97pub fn plan_insert(
98 scx: &StatementContext,
99 InsertStatement {
100 table_name,
101 columns,
102 source,
103 returning,
104 }: InsertStatement<Aug>,
105 params: &Params,
106) -> Result<Plan, PlanError> {
107 let (id, mut expr, returning) =
108 query::plan_insert_query(scx, table_name, columns, source, returning)?;
109 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
110 let returning = returning
111 .expr
112 .into_iter()
113 .map(|mut expr| {
114 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
115 expr.lower_uncorrelated(scx.catalog.system_vars())
116 })
117 .collect::<Result<Vec<_>, _>>()?;
118
119 Ok(Plan::Insert(InsertPlan {
120 id,
121 values: expr,
122 returning,
123 }))
124}
125
126pub fn describe_delete(
127 scx: &StatementContext,
128 stmt: DeleteStatement<Aug>,
129) -> Result<StatementDesc, PlanError> {
130 query::plan_delete_query(scx, stmt)?;
131 Ok(StatementDesc::new(None))
132}
133
134pub fn plan_delete(
135 scx: &StatementContext,
136 stmt: DeleteStatement<Aug>,
137 params: &Params,
138) -> Result<Plan, PlanError> {
139 let rtw_plan = query::plan_delete_query(scx, stmt)?;
140 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
141}
142
143pub fn describe_update(
144 scx: &StatementContext,
145 stmt: UpdateStatement<Aug>,
146) -> Result<StatementDesc, PlanError> {
147 query::plan_update_query(scx, stmt)?;
148 Ok(StatementDesc::new(None))
149}
150
151pub fn plan_update(
152 scx: &StatementContext,
153 stmt: UpdateStatement<Aug>,
154 params: &Params,
155) -> Result<Plan, PlanError> {
156 let rtw_plan = query::plan_update_query(scx, stmt)?;
157 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
158}
159
160pub fn plan_read_then_write(
161 scx: &StatementContext,
162 kind: MutationKind,
163 params: &Params,
164 query::ReadThenWritePlan {
165 id,
166 mut selection,
167 finishing,
168 assignments,
169 }: query::ReadThenWritePlan,
170) -> Result<Plan, PlanError> {
171 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
172 let mut assignments_outer = BTreeMap::new();
173 for (idx, mut set) in assignments {
174 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
175 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
176 assignments_outer.insert(idx, set);
177 }
178
179 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
180 id,
181 selection,
182 finishing,
183 assignments: assignments_outer,
184 kind,
185 returning: Vec::new(),
186 }))
187}
188
189pub fn describe_select(
190 scx: &StatementContext,
191 stmt: SelectStatement<Aug>,
192) -> Result<StatementDesc, PlanError> {
193 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
194 return Ok(StatementDesc::new(Some(desc)));
195 }
196
197 let query::PlannedRootQuery { desc, .. } =
198 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
199 Ok(StatementDesc::new(Some(desc)))
200}
201
202pub fn plan_select(
203 scx: &StatementContext,
204 select: SelectStatement<Aug>,
205 params: &Params,
206 copy_to: Option<CopyFormat>,
207) -> Result<Plan, PlanError> {
208 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
209 return Ok(Plan::SideEffectingFunc(f));
210 }
211
212 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
213 Ok(Plan::Select(plan))
214}
215
216fn plan_select_inner(
217 scx: &StatementContext,
218 select: SelectStatement<Aug>,
219 params: &Params,
220 copy_to: Option<CopyFormat>,
221) -> Result<(SelectPlan, RelationDesc), PlanError> {
222 let when = query::plan_as_of(scx, select.as_of.clone())?;
223 let lifetime = QueryLifetime::OneShot;
224 let query::PlannedRootQuery {
225 mut expr,
226 desc,
227 finishing,
228 scope: _,
229 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
230 expr.bind_parameters(scx, lifetime, params)?;
231
232 expr.try_visit_mut_pre(&mut |expr| {
235 if let HirRelationExpr::TopK { offset, .. } = expr {
236 let offset_value = offset_into_value(offset.take())?;
237 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
238 }
239 Ok::<(), PlanError>(())
240 })?;
241 let limit = match finishing.limit {
251 None => None,
252 Some(mut limit) => {
253 limit.bind_parameters(scx, lifetime, params)?;
254 let Some(limit) = limit.as_literal() else {
256 sql_bail!(
257 "Top-level LIMIT must be a constant expression, got {}",
258 limit
259 )
260 };
261 match limit {
262 Datum::Null => None,
263 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
264 _ => {
265 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
266 sql_bail!("LIMIT must be a non-negative INT or NULL")
267 }
268 }
269 }
270 };
271 let offset = {
272 let mut offset = finishing.offset.clone();
273 offset.bind_parameters(scx, lifetime, params)?;
274 let offset = offset_into_value(offset.take())?;
275 offset
276 .try_into()
277 .expect("checked in offset_into_value that it is not negative")
278 };
279
280 if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
284 && select.as_of.is_some()
285 && expr.contains_unmaterializable_except_temporal()?
286 {
287 bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
288 }
289
290 let plan = SelectPlan {
291 source: expr,
292 when,
293 finishing: RowSetFinishing {
294 limit,
295 offset,
296 project: finishing.project,
297 order_by: finishing.order_by,
298 },
299 copy_to,
300 select: Some(Box::new(select)),
301 };
302
303 Ok((plan, desc))
304}
305
306pub fn describe_explain_plan(
307 scx: &StatementContext,
308 explain: ExplainPlanStatement<Aug>,
309) -> Result<StatementDesc, PlanError> {
310 let mut relation_desc = RelationDesc::builder();
311
312 match explain.stage() {
313 ExplainStage::RawPlan => {
314 let name = "Raw Plan";
315 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316 }
317 ExplainStage::DecorrelatedPlan => {
318 let name = "Decorrelated Plan";
319 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320 }
321 ExplainStage::LocalPlan => {
322 let name = "Locally Optimized Plan";
323 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
324 }
325 ExplainStage::GlobalPlan => {
326 let name = "Optimized Plan";
327 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
328 }
329 ExplainStage::PhysicalPlan => {
330 let name = "Physical Plan";
331 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
332 }
333 ExplainStage::Trace => {
334 relation_desc = relation_desc
335 .with_column("Time", SqlScalarType::UInt64.nullable(false))
336 .with_column("Path", SqlScalarType::String.nullable(false))
337 .with_column("Plan", SqlScalarType::String.nullable(false));
338 }
339 ExplainStage::PlanInsights => {
340 let name = "Plan Insights";
341 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
342 }
343 };
344 let relation_desc = relation_desc.finish();
345
346 Ok(
347 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
348 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
349 _ => vec![],
350 }),
351 )
352}
353
354pub fn describe_explain_pushdown(
355 scx: &StatementContext,
356 statement: ExplainPushdownStatement<Aug>,
357) -> Result<StatementDesc, PlanError> {
358 let relation_desc = RelationDesc::builder()
359 .with_column("Source", SqlScalarType::String.nullable(false))
360 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
361 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
362 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
363 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
364 .finish();
365
366 Ok(
367 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
368 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
369 _ => vec![],
370 }),
371 )
372}
373
374pub fn describe_explain_analyze_object(
375 _scx: &StatementContext,
376 statement: ExplainAnalyzeObjectStatement<Aug>,
377) -> Result<StatementDesc, PlanError> {
378 if statement.as_sql {
379 let relation_desc = RelationDesc::builder()
380 .with_column("SQL", SqlScalarType::String.nullable(false))
381 .finish();
382 return Ok(StatementDesc::new(Some(relation_desc)));
383 }
384
385 match statement.properties {
386 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
387 properties,
388 skew,
389 }) => {
390 let mut relation_desc = RelationDesc::builder()
391 .with_column("operator", SqlScalarType::String.nullable(false));
392
393 if skew {
394 relation_desc =
395 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
396 }
397
398 let mut seen_properties = BTreeSet::new();
399 for property in properties {
400 if !seen_properties.insert(property) {
402 continue;
403 }
404
405 match property {
406 ExplainAnalyzeComputationProperty::Memory if skew => {
407 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
408 relation_desc = relation_desc
409 .with_column("memory_ratio", numeric.clone())
410 .with_column("worker_memory", SqlScalarType::String.nullable(true))
411 .with_column("avg_memory", SqlScalarType::String.nullable(true))
412 .with_column("total_memory", SqlScalarType::String.nullable(true))
413 .with_column("records_ratio", numeric.clone())
414 .with_column("worker_records", numeric.clone())
415 .with_column("avg_records", numeric.clone())
416 .with_column("total_records", numeric);
417 }
418 ExplainAnalyzeComputationProperty::Memory => {
419 relation_desc = relation_desc
420 .with_column("total_memory", SqlScalarType::String.nullable(true))
421 .with_column(
422 "total_records",
423 SqlScalarType::Numeric { max_scale: None }.nullable(true),
424 );
425 }
426 ExplainAnalyzeComputationProperty::Cpu => {
427 if skew {
428 relation_desc = relation_desc
429 .with_column(
430 "cpu_ratio",
431 SqlScalarType::Numeric { max_scale: None }.nullable(true),
432 )
433 .with_column(
434 "worker_elapsed",
435 SqlScalarType::Interval.nullable(true),
436 )
437 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
438 }
439 relation_desc = relation_desc
440 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
441 }
442 }
443 }
444
445 let relation_desc = relation_desc.finish();
446 Ok(StatementDesc::new(Some(relation_desc)))
447 }
448 ExplainAnalyzeProperty::Hints => {
449 let relation_desc = RelationDesc::builder()
450 .with_column("operator", SqlScalarType::String.nullable(true))
451 .with_column("levels", SqlScalarType::Int64.nullable(true))
452 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
453 .with_column("hint", SqlScalarType::Float64.nullable(true))
454 .with_column("savings", SqlScalarType::String.nullable(true))
455 .finish();
456 Ok(StatementDesc::new(Some(relation_desc)))
457 }
458 }
459}
460
461pub fn describe_explain_analyze_cluster(
462 _scx: &StatementContext,
463 statement: ExplainAnalyzeClusterStatement,
464) -> Result<StatementDesc, PlanError> {
465 if statement.as_sql {
466 let relation_desc = RelationDesc::builder()
467 .with_column("SQL", SqlScalarType::String.nullable(false))
468 .finish();
469 return Ok(StatementDesc::new(Some(relation_desc)));
470 }
471
472 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
473
474 let mut relation_desc = RelationDesc::builder()
475 .with_column("object", SqlScalarType::String.nullable(false))
476 .with_column("global_id", SqlScalarType::String.nullable(false));
477
478 if skew {
479 relation_desc =
480 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
481 }
482
483 let mut seen_properties = BTreeSet::new();
484 for property in properties {
485 if !seen_properties.insert(property) {
487 continue;
488 }
489
490 match property {
491 ExplainAnalyzeComputationProperty::Memory if skew => {
492 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
493 relation_desc = relation_desc
494 .with_column("max_operator_memory_ratio", numeric.clone())
495 .with_column("worker_memory", SqlScalarType::String.nullable(true))
496 .with_column("avg_memory", SqlScalarType::String.nullable(true))
497 .with_column("total_memory", SqlScalarType::String.nullable(true))
498 .with_column("max_operator_records_ratio", numeric.clone())
499 .with_column("worker_records", numeric.clone())
500 .with_column("avg_records", numeric.clone())
501 .with_column("total_records", numeric);
502 }
503 ExplainAnalyzeComputationProperty::Memory => {
504 relation_desc = relation_desc
505 .with_column("total_memory", SqlScalarType::String.nullable(true))
506 .with_column(
507 "total_records",
508 SqlScalarType::Numeric { max_scale: None }.nullable(true),
509 );
510 }
511 ExplainAnalyzeComputationProperty::Cpu if skew => {
512 relation_desc = relation_desc
513 .with_column(
514 "max_operator_cpu_ratio",
515 SqlScalarType::Numeric { max_scale: None }.nullable(true),
516 )
517 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
518 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
519 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
520 }
521 ExplainAnalyzeComputationProperty::Cpu => {
522 relation_desc = relation_desc
523 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
524 }
525 }
526 }
527
528 Ok(StatementDesc::new(Some(relation_desc.finish())))
529}
530
531pub fn describe_explain_timestamp(
532 scx: &StatementContext,
533 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
534) -> Result<StatementDesc, PlanError> {
535 let relation_desc = RelationDesc::builder()
536 .with_column("Timestamp", SqlScalarType::String.nullable(false))
537 .finish();
538
539 Ok(StatementDesc::new(Some(relation_desc))
540 .with_params(describe_select(scx, select)?.param_types))
541}
542
543pub fn describe_explain_schema(
544 _: &StatementContext,
545 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
546) -> Result<StatementDesc, PlanError> {
547 let relation_desc = RelationDesc::builder()
548 .with_column("Schema", SqlScalarType::String.nullable(false))
549 .finish();
550 Ok(StatementDesc::new(Some(relation_desc)))
551}
552
553generate_extracted_config!(
562 ExplainPlanOption,
563 (Arity, Option<bool>, Default(None)),
564 (Cardinality, bool, Default(false)),
565 (ColumnNames, bool, Default(false)),
566 (FilterPushdown, Option<bool>, Default(None)),
567 (HumanizedExpressions, Option<bool>, Default(None)),
568 (JoinImplementations, bool, Default(false)),
569 (Keys, bool, Default(false)),
570 (LinearChains, bool, Default(false)),
571 (NoFastPath, bool, Default(false)),
572 (NonNegative, bool, Default(false)),
573 (NoNotices, bool, Default(false)),
574 (NodeIdentifiers, bool, Default(false)),
575 (Raw, bool, Default(false)),
576 (RawPlans, bool, Default(false)),
577 (RawSyntax, bool, Default(false)),
578 (Redacted, bool, Default(false)),
579 (SubtreeSize, bool, Default(false)),
580 (Timing, bool, Default(false)),
581 (Types, bool, Default(false)),
582 (Equivalences, bool, Default(false)),
583 (ReoptimizeImportedViews, Option<bool>, Default(None)),
584 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
585 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
586 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
587 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
588 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
589 (
590 EnableProjectionPushdownAfterRelationCse,
591 Option<bool>,
592 Default(None)
593 )
594);
595
596impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
597 type Error = PlanError;
598
599 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
600 if v.raw {
603 v.raw_plans = true;
604 v.raw_syntax = true;
605 }
606
607 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
610
611 Ok(ExplainConfig {
612 arity: v.arity.unwrap_or(enable_on_prod),
613 cardinality: v.cardinality,
614 column_names: v.column_names,
615 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
616 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
617 join_impls: v.join_implementations,
618 keys: v.keys,
619 linear_chains: !v.raw_plans && v.linear_chains,
620 no_fast_path: v.no_fast_path,
621 no_notices: v.no_notices,
622 node_ids: v.node_identifiers,
623 non_negative: v.non_negative,
624 raw_plans: v.raw_plans,
625 raw_syntax: v.raw_syntax,
626 verbose_syntax: false,
627 redacted: v.redacted,
628 subtree_size: v.subtree_size,
629 equivalences: v.equivalences,
630 timing: v.timing,
631 types: v.types,
632 features: OptimizerFeatureOverrides {
634 enable_guard_subquery_tablefunc: Default::default(),
635 enable_eager_delta_joins: v.enable_eager_delta_joins,
636 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
637 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
638 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
639 enable_consolidate_after_union_negate: Default::default(),
640 enable_reduce_mfp_fusion: Default::default(),
641 enable_cardinality_estimates: Default::default(),
642 persist_fast_path_limit: Default::default(),
643 reoptimize_imported_views: v.reoptimize_imported_views,
644 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
645 enable_projection_pushdown_after_relation_cse: v
646 .enable_projection_pushdown_after_relation_cse,
647 enable_less_reduce_in_eqprop: Default::default(),
648 enable_dequadratic_eqprop_map: Default::default(),
649 enable_eq_classes_withholding_errors: Default::default(),
650 enable_fast_path_plan_insights: Default::default(),
651 enable_cast_elimination: Default::default(),
652 enable_case_literal_transform: Default::default(),
653 enable_simplify_quantified_comparisons: Default::default(),
654 enable_coalesce_case_transform: Default::default(),
655 },
656 })
657 }
658}
659
660fn plan_explainee(
661 scx: &StatementContext,
662 explainee: Explainee<Aug>,
663 params: &Params,
664) -> Result<plan::Explainee, PlanError> {
665 use crate::plan::ExplaineeStatement;
666
667 let is_replan = matches!(
668 explainee,
669 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
670 );
671
672 let explainee = match explainee {
673 Explainee::View(name) | Explainee::ReplanView(name) => {
674 let item = scx.get_item_by_resolved_name(&name)?;
675 let item_type = item.item_type();
676 if item_type != CatalogItemType::View {
677 sql_bail!("Expected {name} to be a view, not a {item_type}");
678 }
679 match is_replan {
680 true => crate::plan::Explainee::ReplanView(item.id()),
681 false => crate::plan::Explainee::View(item.id()),
682 }
683 }
684 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
685 let item = scx.get_item_by_resolved_name(&name)?;
686 let item_type = item.item_type();
687 if item_type != CatalogItemType::MaterializedView {
688 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
689 }
690 match is_replan {
691 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
692 false => crate::plan::Explainee::MaterializedView(item.id()),
693 }
694 }
695 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
696 let item = scx.get_item_by_resolved_name(&name)?;
697 let item_type = item.item_type();
698 if item_type != CatalogItemType::Index {
699 sql_bail!("Expected {name} to be an index, not a {item_type}");
700 }
701 match is_replan {
702 true => crate::plan::Explainee::ReplanIndex(item.id()),
703 false => crate::plan::Explainee::Index(item.id()),
704 }
705 }
706 Explainee::Select(select, broken) => {
707 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
708 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
709 }
710 Explainee::CreateView(mut stmt, broken) => {
711 if stmt.if_exists != IfExistsBehavior::Skip {
712 stmt.if_exists = IfExistsBehavior::Skip;
717 } else {
718 sql_bail!(
719 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
720 (the behavior is implied within the scope of an enclosing EXPLAIN)"
721 );
722 }
723
724 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
725 sql_bail!("expected CreateViewPlan plan");
726 };
727
728 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
729 }
730 Explainee::CreateMaterializedView(mut stmt, broken) => {
731 if stmt.if_exists != IfExistsBehavior::Skip {
732 stmt.if_exists = IfExistsBehavior::Skip;
737 } else {
738 sql_bail!(
739 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
740 (the behavior is implied within the scope of an enclosing EXPLAIN)"
741 );
742 }
743
744 let Plan::CreateMaterializedView(plan) =
745 ddl::plan_create_materialized_view(scx, *stmt)?
746 else {
747 sql_bail!("expected CreateMaterializedViewPlan plan");
748 };
749
750 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
751 broken,
752 plan,
753 })
754 }
755 Explainee::CreateIndex(mut stmt, broken) => {
756 if !stmt.if_not_exists {
757 stmt.if_not_exists = true;
760 } else {
761 sql_bail!(
762 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
763 (the behavior is implied within the scope of an enclosing EXPLAIN)"
764 );
765 }
766
767 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
768 sql_bail!("expected CreateIndexPlan plan");
769 };
770
771 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
772 }
773 Explainee::Subscribe(stmt, broken) => {
774 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
775 sql_bail!("expected SubscribePlan");
776 };
777 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
778 }
779 };
780
781 Ok(explainee)
782}
783
784pub fn plan_explain_plan(
785 scx: &StatementContext,
786 explain: ExplainPlanStatement<Aug>,
787 params: &Params,
788) -> Result<Plan, PlanError> {
789 let (format, verbose_syntax) = match explain.format() {
790 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
791 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
792 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
793 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
794 };
795 let stage = explain.stage();
796
797 let mut config = {
799 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
800
801 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
802 with_options.filter_pushdown = Some(false);
804 }
805
806 ExplainConfig::try_from(with_options)?
807 };
808 config.verbose_syntax = verbose_syntax;
809
810 let explainee = plan_explainee(scx, explain.explainee, params)?;
811
812 Ok(Plan::ExplainPlan(ExplainPlanPlan {
813 stage,
814 format,
815 config,
816 explainee,
817 }))
818}
819
820pub fn plan_explain_schema(
821 scx: &StatementContext,
822 explain_schema: ExplainSinkSchemaStatement<Aug>,
823) -> Result<Plan, PlanError> {
824 let ExplainSinkSchemaStatement {
825 schema_for,
826 format: _,
828 mut statement,
829 } = explain_schema;
830
831 statement.name = Some(UnresolvedItemName::qualified(&[
835 ident!("mz_catalog"),
836 ident!("mz_explain_schema"),
837 ]));
838
839 crate::pure::purify_create_sink_avro_doc_on_options(
840 scx.catalog,
841 *statement.from.item_id(),
842 &mut statement.format,
843 )?;
844
845 match ddl::plan_create_sink(scx, statement)? {
846 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
847 StorageSinkConnection::Kafka(KafkaSinkConnection {
848 format:
849 KafkaSinkFormat {
850 key_format,
851 value_format:
852 KafkaSinkFormatType::Avro {
853 schema: value_schema,
854 ..
855 },
856 ..
857 },
858 ..
859 }) => {
860 let schema = match schema_for {
861 ExplainSinkSchemaFor::Key => key_format
862 .and_then(|f| match f {
863 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
864 _ => None,
865 })
866 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
867 ExplainSinkSchemaFor::Value => value_schema,
868 };
869
870 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
871 sink_from: sink.from,
872 json_schema: schema,
873 }))
874 }
875 _ => bail_unsupported!(
876 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
877 ),
878 },
879 _ => bail_internal!("plan_sink did not produce a CreateSink plan"),
880 }
881}
882
883pub fn plan_explain_pushdown(
884 scx: &StatementContext,
885 statement: ExplainPushdownStatement<Aug>,
886 params: &Params,
887) -> Result<Plan, PlanError> {
888 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
889 let explainee = plan_explainee(scx, statement.explainee, params)?;
890 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
891}
892
893pub fn plan_explain_analyze_object(
894 scx: &StatementContext,
895 statement: ExplainAnalyzeObjectStatement<Aug>,
896 params: &Params,
897) -> Result<Plan, PlanError> {
898 let explainee_name = statement
899 .explainee
900 .name()
901 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
902 .full_name_str();
903 let explainee = plan_explainee(scx, statement.explainee, params)?;
904
905 match explainee {
906 plan::Explainee::Index(_index_id) => (),
907 plan::Explainee::MaterializedView(_item_id) => (),
908 _ => {
909 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
910 }
911 };
912
913 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
928 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
929 let mut predicates = vec![format!(
930 "mo.name = {}",
931 escaped_string_literal(&explainee_name)
932 )];
933 let mut order_by = vec!["mlm.lir_id DESC"];
934
935 match statement.properties {
936 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
937 properties,
938 skew,
939 }) => {
940 let mut worker_id = None;
941 let mut seen_properties = BTreeSet::new();
942 for property in properties {
943 if !seen_properties.insert(property) {
945 continue;
946 }
947
948 match property {
949 ExplainAnalyzeComputationProperty::Memory => {
950 ctes.push((
951 "summary_memory",
952 r#"
953 SELECT mlm.global_id AS global_id,
954 mlm.lir_id AS lir_id,
955 SUM(mas.size) AS total_memory,
956 SUM(mas.records) AS total_records,
957 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
958 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
959 FROM mz_introspection.mz_lir_mapping mlm
960 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
961 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
962 ON (mas.operator_id = valid_id)
963GROUP BY mlm.global_id, mlm.lir_id"#,
964 ));
965 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
966
967 if skew {
968 ctes.push((
969 "per_worker_memory",
970 r#"
971 SELECT mlm.global_id AS global_id,
972 mlm.lir_id AS lir_id,
973 mas.worker_id AS worker_id,
974 SUM(mas.size) AS worker_memory,
975 SUM(mas.records) AS worker_records
976 FROM mz_introspection.mz_lir_mapping mlm
977 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
978 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
979 ON (mas.operator_id = valid_id)
980GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
981 ));
982 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
983
984 if let Some(worker_id) = worker_id {
985 predicates.push(format!("pwm.worker_id = {worker_id}"));
986 } else {
987 worker_id = Some("pwm.worker_id");
988 columns.push("pwm.worker_id AS worker_id");
989 order_by.push("worker_id");
990 }
991
992 columns.extend([
993 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
994 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
995 "pg_size_pretty(sm.avg_memory) AS avg_memory",
996 "pg_size_pretty(sm.total_memory) AS total_memory",
997 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
998 "pwm.worker_records AS worker_records",
999 "sm.avg_records AS avg_records",
1000 "sm.total_records AS total_records",
1001 ]);
1002 } else {
1003 columns.extend([
1004 "pg_size_pretty(sm.total_memory) AS total_memory",
1005 "sm.total_records AS total_records",
1006 ]);
1007 }
1008 }
1009 ExplainAnalyzeComputationProperty::Cpu => {
1010 ctes.push((
1011 "summary_cpu",
1012 r#"
1013 SELECT mlm.global_id AS global_id,
1014 mlm.lir_id AS lir_id,
1015 SUM(mse.elapsed_ns) AS total_ns,
1016 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1017 FROM mz_introspection.mz_lir_mapping mlm
1018 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1019 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1020 ON (mse.id = valid_id)
1021GROUP BY mlm.global_id, mlm.lir_id"#,
1022 ));
1023 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1024
1025 if skew {
1026 ctes.push((
1027 "per_worker_cpu",
1028 r#"
1029 SELECT mlm.global_id AS global_id,
1030 mlm.lir_id AS lir_id,
1031 mse.worker_id AS worker_id,
1032 SUM(mse.elapsed_ns) AS worker_ns
1033 FROM mz_introspection.mz_lir_mapping mlm
1034 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1035 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1036 ON (mse.id = valid_id)
1037GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1038 ));
1039 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1040
1041 if let Some(worker_id) = worker_id {
1042 predicates.push(format!("pwc.worker_id = {worker_id}"));
1043 } else {
1044 worker_id = Some("pwc.worker_id");
1045 columns.push("pwc.worker_id AS worker_id");
1046 order_by.push("worker_id");
1047 }
1048
1049 columns.extend([
1050 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1051 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1052 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1053 ]);
1054 }
1055 columns.push(
1056 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1057 );
1058 }
1059 }
1060 }
1061 }
1062 ExplainAnalyzeProperty::Hints => {
1063 columns.extend([
1064 "megsa.levels AS levels",
1065 "megsa.to_cut AS to_cut",
1066 "megsa.hint AS hint",
1067 "pg_size_pretty(megsa.savings) AS savings",
1068 ]);
1069 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1070 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1071 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1072 }
1073 }
1074
1075 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1076
1077 let ctes = if !ctes.is_empty() {
1078 format!(
1079 "WITH {}",
1080 separated(
1081 ",\n",
1082 ctes.iter()
1083 .map(|(name, defn)| format!("{name} AS ({defn})"))
1084 )
1085 )
1086 } else {
1087 String::new()
1088 };
1089 let columns = separated(", ", columns);
1090 let from = separated(" ", from);
1091 let predicates = separated(" AND ", predicates);
1092 let order_by = separated(", ", order_by);
1093 let query = format!(
1094 r#"{ctes}
1095SELECT {columns}
1096FROM {from}
1097WHERE {predicates}
1098ORDER BY {order_by}"#
1099 );
1100
1101 if statement.as_sql {
1102 let rows = vec![Row::pack_slice(&[Datum::String(
1103 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1104 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1105 })?,
1106 )])];
1107 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1108
1109 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1110 } else {
1111 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1112 show_select.plan()
1113 }
1114}
1115
1116pub fn plan_explain_analyze_cluster(
1117 scx: &StatementContext,
1118 statement: ExplainAnalyzeClusterStatement,
1119 _params: &Params,
1120) -> Result<Plan, PlanError> {
1121 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1146 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1147 let mut predicates = vec![];
1148 let mut order_by = vec![];
1149
1150 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1151 let mut worker_id = None;
1152 let mut seen_properties = BTreeSet::new();
1153 for property in properties {
1154 if !seen_properties.insert(property) {
1156 continue;
1157 }
1158
1159 match property {
1160 ExplainAnalyzeComputationProperty::Memory => {
1161 if skew {
1162 let mut set_worker_id = false;
1163 if let Some(worker_id) = worker_id {
1164 predicates.push(format!("om.worker_id = {worker_id}"));
1166 } else {
1167 worker_id = Some("om.worker_id");
1168 columns.push("om.worker_id AS worker_id");
1169 set_worker_id = true; };
1171
1172 ctes.push((
1174 "per_operator_memory_summary",
1175 r#"
1176SELECT mlm.global_id AS global_id,
1177 mlm.lir_id AS lir_id,
1178 SUM(mas.size) AS total_memory,
1179 SUM(mas.records) AS total_records,
1180 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1181 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1182FROM mz_introspection.mz_lir_mapping mlm
1183 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1184 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1185 ON (mas.operator_id = valid_id)
1186GROUP BY mlm.global_id, mlm.lir_id"#,
1187 ));
1188
1189 ctes.push((
1191 "per_operator_memory_per_worker",
1192 r#"
1193SELECT mlm.global_id AS global_id,
1194 mlm.lir_id AS lir_id,
1195 mas.worker_id AS worker_id,
1196 SUM(mas.size) AS worker_memory,
1197 SUM(mas.records) AS worker_records
1198FROM mz_introspection.mz_lir_mapping mlm
1199 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1200 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1201 ON (mas.operator_id = valid_id)
1202GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1203 ));
1204
1205 ctes.push((
1207 "per_operator_memory_ratios",
1208 r#"
1209SELECT pompw.global_id AS global_id,
1210 pompw.lir_id AS lir_id,
1211 pompw.worker_id AS worker_id,
1212 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1213 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1214 FROM per_operator_memory_per_worker pompw
1215 JOIN per_operator_memory_summary poms
1216 USING (global_id, lir_id)
1217"#,
1218 ));
1219
1220 ctes.push((
1222 "object_memory",
1223 r#"
1224SELECT pompw.global_id AS global_id,
1225 pompw.worker_id AS worker_id,
1226 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1227 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1228 SUM(pompw.worker_memory) AS worker_memory,
1229 SUM(pompw.worker_records) AS worker_records
1230FROM per_operator_memory_per_worker pompw
1231 JOIN per_operator_memory_ratios pomr
1232 USING (global_id, worker_id, lir_id)
1233GROUP BY pompw.global_id, pompw.worker_id
1234"#,
1235 ));
1236
1237 ctes.push(("object_average_memory", r#"
1239SELECT om.global_id AS global_id,
1240 SUM(om.worker_memory) AS total_memory,
1241 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1242 SUM(om.worker_records) AS total_records,
1243 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1244 FROM object_memory om
1245GROUP BY om.global_id"#));
1246
1247 from.push("LEFT JOIN object_memory om USING (global_id)");
1248 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1249
1250 columns.extend([
1251 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1252 "pg_size_pretty(om.worker_memory) AS worker_memory",
1253 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1254 "pg_size_pretty(oam.total_memory) AS total_memory",
1255 "om.max_operator_records_ratio AS max_operator_records_ratio",
1256 "om.worker_records AS worker_records",
1257 "oam.avg_records AS avg_records",
1258 "oam.total_records AS total_records",
1259 ]);
1260
1261 order_by.extend([
1262 "max_operator_memory_ratio DESC",
1263 "max_operator_records_ratio DESC",
1264 "om.worker_memory DESC",
1265 "worker_records DESC",
1266 ]);
1267
1268 if set_worker_id {
1269 order_by.push("worker_id");
1270 }
1271 } else {
1272 ctes.push((
1274 "per_operator_memory_totals",
1275 r#"
1276 SELECT mlm.global_id AS global_id,
1277 mlm.lir_id AS lir_id,
1278 SUM(mas.size) AS total_memory,
1279 SUM(mas.records) AS total_records
1280 FROM mz_introspection.mz_lir_mapping mlm
1281 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1282 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1283 ON (mas.operator_id = valid_id)
1284 GROUP BY mlm.global_id, mlm.lir_id"#,
1285 ));
1286
1287 ctes.push((
1288 "object_memory_totals",
1289 r#"
1290SELECT pomt.global_id AS global_id,
1291 SUM(pomt.total_memory) AS total_memory,
1292 SUM(pomt.total_records) AS total_records
1293FROM per_operator_memory_totals pomt
1294GROUP BY pomt.global_id
1295"#,
1296 ));
1297
1298 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1299 columns.extend([
1300 "pg_size_pretty(omt.total_memory) AS total_memory",
1301 "omt.total_records AS total_records",
1302 ]);
1303 order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1304 }
1305 }
1306 ExplainAnalyzeComputationProperty::Cpu => {
1307 if skew {
1308 let mut set_worker_id = false;
1309 if let Some(worker_id) = worker_id {
1310 predicates.push(format!("oc.worker_id = {worker_id}"));
1312 } else {
1313 worker_id = Some("oc.worker_id");
1314 columns.push("oc.worker_id AS worker_id");
1315 set_worker_id = true; };
1317
1318 ctes.push((
1320 "per_operator_cpu_summary",
1321 r#"
1322SELECT mlm.global_id AS global_id,
1323 mlm.lir_id AS lir_id,
1324 SUM(mse.elapsed_ns) AS total_ns,
1325 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1326FROM mz_introspection.mz_lir_mapping mlm
1327CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1328 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1329 ON (mse.id = valid_id)
1330GROUP BY mlm.global_id, mlm.lir_id"#,
1331));
1332
1333 ctes.push((
1335 "per_operator_cpu_per_worker",
1336 r#"
1337SELECT mlm.global_id AS global_id,
1338 mlm.lir_id AS lir_id,
1339 mse.worker_id AS worker_id,
1340 SUM(mse.elapsed_ns) AS worker_ns
1341FROM mz_introspection.mz_lir_mapping mlm
1342CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1343 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1344 ON (mse.id = valid_id)
1345GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1346 ));
1347
1348 ctes.push((
1350 "per_operator_cpu_ratios",
1351 r#"
1352SELECT pocpw.global_id AS global_id,
1353 pocpw.lir_id AS lir_id,
1354 pocpw.worker_id AS worker_id,
1355 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1356FROM per_operator_cpu_per_worker pocpw
1357 JOIN per_operator_cpu_summary pocs
1358 USING (global_id, lir_id)
1359"#,
1360 ));
1361
1362 ctes.push((
1364 "object_cpu",
1365 r#"
1366SELECT pocpw.global_id AS global_id,
1367 pocpw.worker_id AS worker_id,
1368 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1369 SUM(pocpw.worker_ns) AS worker_ns
1370FROM per_operator_cpu_per_worker pocpw
1371 JOIN per_operator_cpu_ratios pomr
1372 USING (global_id, worker_id, lir_id)
1373GROUP BY pocpw.global_id, pocpw.worker_id
1374"#,
1375 ));
1376
1377 ctes.push((
1379 "object_average_cpu",
1380 r#"
1381SELECT oc.global_id AS global_id,
1382 SUM(oc.worker_ns) AS total_ns,
1383 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1384 FROM object_cpu oc
1385GROUP BY oc.global_id"#,));
1386
1387 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1388 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1389
1390 columns.extend([
1391 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1392 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1393 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1394 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1395 ]);
1396
1397 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1398
1399 if set_worker_id {
1400 order_by.push("worker_id");
1401 }
1402 } else {
1403 ctes.push((
1405 "per_operator_cpu_totals",
1406 r#"
1407 SELECT mlm.global_id AS global_id,
1408 mlm.lir_id AS lir_id,
1409 SUM(mse.elapsed_ns) AS total_ns
1410 FROM mz_introspection.mz_lir_mapping mlm
1411 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1412 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1413 ON (mse.id = valid_id)
1414 GROUP BY mlm.global_id, mlm.lir_id"#,
1415 ));
1416
1417 ctes.push((
1418 "object_cpu_totals",
1419 r#"
1420SELECT poct.global_id AS global_id,
1421 SUM(poct.total_ns) AS total_ns
1422FROM per_operator_cpu_totals poct
1423GROUP BY poct.global_id
1424"#,
1425 ));
1426
1427 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1428 columns
1429 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1430 order_by.extend(["total_elapsed DESC"]);
1431 }
1432 }
1433 }
1434 }
1435
1436 let ctes = if !ctes.is_empty() {
1438 format!(
1439 "WITH {}",
1440 separated(
1441 ",\n",
1442 ctes.iter()
1443 .map(|(name, defn)| format!("{name} AS ({defn})"))
1444 )
1445 )
1446 } else {
1447 String::new()
1448 };
1449 let columns = separated(", ", columns);
1450 let from = separated(" ", from);
1451 let predicates = if !predicates.is_empty() {
1452 format!("WHERE {}", separated(" AND ", predicates))
1453 } else {
1454 String::new()
1455 };
1456 order_by.push("mo.name DESC");
1458 let order_by = separated(", ", order_by);
1459 let query = format!(
1460 r#"{ctes}
1461SELECT {columns}
1462FROM {from}
1463{predicates}
1464ORDER BY {order_by}"#
1465 );
1466
1467 if statement.as_sql {
1468 let rows = vec![Row::pack_slice(&[Datum::String(
1469 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1470 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1471 })?,
1472 )])];
1473 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1474
1475 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1476 } else {
1477 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1478 show_select.plan()
1479 }
1480}
1481
1482pub fn plan_explain_timestamp(
1483 scx: &StatementContext,
1484 explain: ExplainTimestampStatement<Aug>,
1485) -> Result<Plan, PlanError> {
1486 let (format, _verbose_syntax) = match explain.format() {
1487 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1488 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1489 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1490 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1491 };
1492
1493 let raw_plan = {
1494 let query::PlannedRootQuery {
1495 expr: raw_plan,
1496 desc: _,
1497 finishing: _,
1498 scope: _,
1499 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1500 if raw_plan.contains_parameters()? {
1501 return Err(PlanError::ParameterNotAllowed(
1502 "EXPLAIN TIMESTAMP".to_string(),
1503 ));
1504 }
1505
1506 raw_plan
1507 };
1508 let when = query::plan_as_of(scx, explain.select.as_of)?;
1509
1510 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1511 format,
1512 raw_plan,
1513 when,
1514 }))
1515}
1516
1517generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1518
1519pub fn describe_subscribe(
1520 scx: &StatementContext,
1521 stmt: SubscribeStatement<Aug>,
1522) -> Result<StatementDesc, PlanError> {
1523 let relation_desc = match stmt.relation {
1524 SubscribeRelation::Name(name) => {
1525 let item = scx.get_item_by_resolved_name(&name)?;
1526 match item.relation_desc() {
1527 Some(desc) => desc.into_owned(),
1528 None => sql_bail!(
1529 "'{}' cannot be subscribed to because it is a {}",
1530 name.full_name_str(),
1531 item.item_type(),
1532 ),
1533 }
1534 }
1535 SubscribeRelation::Query(query) => {
1536 let query::PlannedRootQuery { desc, .. } =
1537 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1538 desc
1539 }
1540 };
1541 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1542 let progress = progress.unwrap_or(false);
1543 let mut desc = RelationDesc::builder().with_column(
1544 "mz_timestamp",
1545 SqlScalarType::Numeric {
1546 max_scale: Some(NumericMaxScale::ZERO),
1547 }
1548 .nullable(false),
1549 );
1550 if progress {
1551 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1552 }
1553
1554 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1555 match stmt.output {
1556 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1557 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1558 for (name, mut ty) in relation_desc.into_iter() {
1559 if progress {
1560 ty.nullable = true;
1561 }
1562 desc = desc.with_column(name, ty);
1563 }
1564 }
1565 SubscribeOutput::EnvelopeUpsert { key_columns }
1566 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1567 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1568 let key_columns = key_columns
1569 .into_iter()
1570 .map(normalize::column_name)
1571 .collect_vec();
1572 let mut before_values_desc = RelationDesc::builder();
1573 let mut after_values_desc = RelationDesc::builder();
1574
1575 for column_name in &key_columns {
1577 let mut column_ty = relation_desc
1578 .get_by_name(column_name)
1579 .map(|(_pos, ty)| ty.clone())
1580 .ok_or_else(|| PlanError::UnknownColumn {
1581 table: None,
1582 column: column_name.clone(),
1583 similar: Box::new([]),
1584 })?;
1585 if progress {
1586 column_ty.nullable = true;
1587 }
1588 desc = desc.with_column(column_name, column_ty);
1589 }
1590
1591 for (mut name, mut ty) in relation_desc
1594 .into_iter()
1595 .filter(|(name, _ty)| !key_columns.contains(name))
1596 {
1597 ty.nullable = true;
1598 before_values_desc =
1599 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1600 if debezium {
1601 name = format!("after_{}", name).into();
1602 }
1603 after_values_desc = after_values_desc.with_column(name, ty);
1604 }
1605
1606 if debezium {
1607 desc = desc.concat(before_values_desc);
1608 }
1609 desc = desc.concat(after_values_desc);
1610 }
1611 }
1612 Ok(StatementDesc::new(Some(desc.finish())))
1613}
1614
1615pub fn plan_subscribe(
1616 scx: &StatementContext,
1617 SubscribeStatement {
1618 relation,
1619 options,
1620 as_of,
1621 up_to,
1622 output,
1623 }: SubscribeStatement<Aug>,
1624 params: &Params,
1625 copy_to: Option<CopyFormat>,
1626) -> Result<Plan, PlanError> {
1627 let (from, desc, scope) = match relation {
1628 SubscribeRelation::Name(name) => {
1629 let item = scx.get_item_by_resolved_name(&name)?;
1630 let Some(desc) = item.relation_desc() else {
1631 sql_bail!(
1632 "'{}' cannot be subscribed to because it is a {}",
1633 name.full_name_str(),
1634 item.item_type(),
1635 );
1636 };
1637 let item_name = match name {
1638 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1639 _ => None,
1640 };
1641 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1642 (
1643 SubscribeFrom::Id(item.global_id()),
1644 desc.into_owned(),
1645 scope,
1646 )
1647 }
1648 SubscribeRelation::Query(query) => {
1649 #[allow(deprecated)] let query::PlannedRootQuery {
1651 mut expr,
1652 desc,
1653 finishing,
1654 scope,
1655 } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1656 expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?;
1657 let query = query::PlannedRootQuery {
1658 expr,
1659 desc,
1660 finishing,
1661 scope,
1662 };
1663 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1667 &query.finishing,
1668 query.desc.arity()
1669 ));
1670 let desc = query.desc.clone();
1671 (
1672 SubscribeFrom::Query {
1673 expr: query.expr,
1674 desc: query.desc,
1675 },
1676 desc,
1677 query.scope,
1678 )
1679 }
1680 };
1681
1682 let when = query::plan_as_of(scx, as_of)?;
1683 let up_to = up_to
1684 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1685 .transpose()?;
1686
1687 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1688 let ecx = ExprContext {
1689 qcx: &qcx,
1690 name: "",
1691 scope: &scope,
1692 relation_type: desc.typ(),
1693 allow_aggregates: false,
1694 allow_subqueries: true,
1695 allow_parameters: true,
1696 allow_windows: false,
1697 };
1698
1699 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1700 let output = match output {
1701 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1702 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1703 let order_by = key_columns
1704 .iter()
1705 .map(|ident| OrderByExpr {
1706 expr: Expr::Identifier(vec![ident.clone()]),
1707 asc: None,
1708 nulls_last: None,
1709 })
1710 .collect_vec();
1711 let (order_by, map_exprs) = query::plan_order_by_exprs(
1712 &ExprContext {
1713 name: "ENVELOPE UPSERT KEY clause",
1714 ..ecx
1715 },
1716 &order_by[..],
1717 &output_columns[..],
1718 )?;
1719 if !map_exprs.is_empty() {
1720 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1721 }
1722 plan::SubscribeOutput::EnvelopeUpsert {
1723 order_by_keys: order_by,
1724 }
1725 }
1726 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1727 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1728 let order_by = key_columns
1729 .iter()
1730 .map(|ident| OrderByExpr {
1731 expr: Expr::Identifier(vec![ident.clone()]),
1732 asc: None,
1733 nulls_last: None,
1734 })
1735 .collect_vec();
1736 let (order_by, map_exprs) = query::plan_order_by_exprs(
1737 &ExprContext {
1738 name: "ENVELOPE DEBEZIUM KEY clause",
1739 ..ecx
1740 },
1741 &order_by[..],
1742 &output_columns[..],
1743 )?;
1744 if !map_exprs.is_empty() {
1745 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1746 }
1747 plan::SubscribeOutput::EnvelopeDebezium {
1748 order_by_keys: order_by,
1749 }
1750 }
1751 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1752 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1753 let mz_diff = "mz_diff".into();
1754 let output_columns = std::iter::once((0, &mz_diff))
1755 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1756 .collect_vec();
1757 match query::plan_order_by_exprs(
1758 &ExprContext {
1759 name: "WITHIN TIMESTAMP ORDER BY clause",
1760 ..ecx
1761 },
1762 &order_by[..],
1763 &output_columns[..],
1764 ) {
1765 Err(PlanError::UnknownColumn {
1766 table: None,
1767 column,
1768 similar: _,
1769 }) if &column == &mz_diff => {
1770 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1773 }
1774 Err(e) => return Err(e),
1775 Ok((order_by, map_exprs)) => {
1776 if !map_exprs.is_empty() {
1777 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1778 }
1779
1780 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1781 }
1782 }
1783 }
1784 };
1785
1786 let SubscribeOptionExtracted {
1787 progress, snapshot, ..
1788 } = options.try_into()?;
1789 Ok(Plan::Subscribe(SubscribePlan {
1790 from,
1791 when,
1792 up_to,
1793 with_snapshot: snapshot.unwrap_or(true),
1794 copy_to,
1795 emit_progress: progress.unwrap_or(false),
1796 output,
1797 }))
1798}
1799
1800pub fn describe_copy_from_table(
1801 scx: &StatementContext,
1802 table_name: <Aug as AstInfo>::ItemName,
1803 columns: Vec<Ident>,
1804) -> Result<StatementDesc, PlanError> {
1805 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1806 Ok(StatementDesc::new(Some(desc)))
1807}
1808
1809pub fn describe_copy_item(
1810 scx: &StatementContext,
1811 object_name: <Aug as AstInfo>::ItemName,
1812 columns: Vec<Ident>,
1813) -> Result<StatementDesc, PlanError> {
1814 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1815 Ok(StatementDesc::new(Some(desc)))
1816}
1817
1818pub fn describe_copy(
1819 scx: &StatementContext,
1820 CopyStatement {
1821 relation,
1822 direction,
1823 ..
1824 }: CopyStatement<Aug>,
1825) -> Result<StatementDesc, PlanError> {
1826 Ok(match (relation, direction) {
1827 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1828 describe_copy_item(scx, name, columns)?
1829 }
1830 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1831 describe_copy_from_table(scx, name, columns)?
1832 }
1833 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1834 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1835 }
1836 .with_is_copy())
1837}
1838
1839fn plan_copy_to_expr(
1840 scx: &StatementContext,
1841 select_plan: SelectPlan,
1842 desc: RelationDesc,
1843 to: &Expr<Aug>,
1844 format: CopyFormat,
1845 options: CopyOptionExtracted,
1846) -> Result<Plan, PlanError> {
1847 let conn_id = match options.aws_connection {
1848 Some(conn_id) => CatalogItemId::from(conn_id),
1849 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1850 };
1851 let connection = scx.get_item(&conn_id).connection()?;
1852
1853 match connection {
1854 mz_storage_types::connections::Connection::Aws(_) => {}
1855 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1856 }
1857
1858 let format = match format {
1859 CopyFormat::Csv => {
1860 let quote = extract_byte_param_value(options.quote, "quote")?;
1861 let escape = extract_byte_param_value(options.escape, "escape")?;
1862 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1863 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1864 CopyCsvFormatParams::try_new(
1865 delimiter,
1866 quote,
1867 escape,
1868 options.header,
1869 options.null,
1870 )
1871 .map_err(|e| sql_err!("{}", e))?,
1872 ))
1873 }
1874 CopyFormat::Parquet => {
1875 ArrowBuilder::validate_desc_for_parquet(&desc, |_| None)
1878 .map_err(|e| sql_err!("{}", e))?;
1879 S3SinkFormat::Parquet
1880 }
1881 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1882 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1883 };
1884
1885 let mut to_expr = to.clone();
1887 transform_ast::transform(scx, &mut to_expr)?;
1888 let relation_type = RelationDesc::empty();
1889 let ecx = &ExprContext {
1890 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1891 name: "COPY TO target",
1892 scope: &Scope::empty(),
1893 relation_type: relation_type.typ(),
1894 allow_aggregates: false,
1895 allow_subqueries: false,
1896 allow_parameters: false,
1897 allow_windows: false,
1898 };
1899
1900 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1901
1902 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1903 sql_bail!(
1904 "MAX FILE SIZE cannot be less than {}",
1905 MIN_S3_SINK_FILE_SIZE
1906 );
1907 }
1908 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1909 sql_bail!(
1910 "MAX FILE SIZE cannot be greater than {}",
1911 MAX_S3_SINK_FILE_SIZE
1912 );
1913 }
1914
1915 Ok(Plan::CopyTo(CopyToPlan {
1916 select_plan,
1917 desc,
1918 to,
1919 connection: connection.to_owned(),
1920 connection_id: conn_id,
1921 format,
1922 max_file_size: options.max_file_size.as_bytes(),
1923 }))
1924}
1925
1926fn plan_copy_from(
1927 scx: &StatementContext,
1928 target: &CopyTarget<Aug>,
1929 table_name: ResolvedItemName,
1930 columns: Vec<Ident>,
1931 format: CopyFormat,
1932 options: CopyOptionExtracted,
1933) -> Result<Plan, PlanError> {
1934 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1935 match option {
1936 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1937 None => Ok(()),
1938 }
1939 }
1940
1941 let source = match target {
1942 CopyTarget::Stdin => CopyFromSource::Stdin,
1943 CopyTarget::Expr(from) => {
1944 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1945
1946 let mut from_expr = from.clone();
1948 transform_ast::transform(scx, &mut from_expr)?;
1949 let relation_type = RelationDesc::empty();
1950 let ecx = &ExprContext {
1951 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1952 name: "COPY FROM target",
1953 scope: &Scope::empty(),
1954 relation_type: relation_type.typ(),
1955 allow_aggregates: false,
1956 allow_subqueries: false,
1957 allow_parameters: false,
1958 allow_windows: false,
1959 };
1960 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1961
1962 match options.aws_connection {
1963 Some(conn_id) => {
1964 let conn_id = CatalogItemId::from(conn_id);
1965
1966 let connection = match scx.get_item(&conn_id).connection()? {
1968 mz_storage_types::connections::Connection::Aws(conn) => conn,
1969 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1970 };
1971
1972 CopyFromSource::AwsS3 {
1973 uri: from,
1974 connection,
1975 connection_id: conn_id,
1976 }
1977 }
1978 None => CopyFromSource::Url(from),
1979 }
1980 }
1981 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1982 };
1983
1984 let params = match format {
1985 CopyFormat::Text => {
1986 only_available_with_csv(options.quote, "quote")?;
1987 only_available_with_csv(options.escape, "escape")?;
1988 only_available_with_csv(options.header, "HEADER")?;
1989 let delimiter =
1990 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1991 let null = match options.null {
1992 Some(null) => Cow::from(null),
1993 None => Cow::from("\\N"),
1994 };
1995 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1996 }
1997 CopyFormat::Csv => {
1998 let quote = extract_byte_param_value(options.quote, "quote")?;
1999 let escape = extract_byte_param_value(options.escape, "escape")?;
2000 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
2001 CopyFormatParams::Csv(
2002 CopyCsvFormatParams::try_new(
2003 delimiter,
2004 quote,
2005 escape,
2006 options.header,
2007 options.null,
2008 )
2009 .map_err(|e| sql_err!("{}", e))?,
2010 )
2011 }
2012 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2013 CopyFormat::Parquet => CopyFormatParams::Parquet,
2014 };
2015
2016 let filter = match (options.files, options.pattern) {
2017 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2018 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2019 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2020 (None, None) => None,
2021 };
2022
2023 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2024 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2025 }
2026
2027 let table_name_string = table_name.full_name_str();
2028
2029 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2030
2031 let Some(mfp) = maybe_mfp else {
2032 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2033 };
2034
2035 Ok(Plan::CopyFrom(CopyFromPlan {
2036 target_id: id,
2037 target_name: table_name_string,
2038 source,
2039 columns,
2040 source_desc,
2041 mfp,
2042 params,
2043 filter,
2044 }))
2045}
2046
2047fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2048 match v {
2049 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2050 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2051 None => Ok(None),
2052 }
2053}
2054
2055generate_extracted_config!(
2056 CopyOption,
2057 (Format, String),
2058 (Delimiter, String),
2059 (Null, String),
2060 (Escape, String),
2061 (Quote, String),
2062 (Header, bool),
2063 (AwsConnection, with_options::Object),
2064 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2065 (Files, Vec<String>),
2066 (Pattern, String)
2067);
2068
2069pub fn plan_copy(
2070 scx: &StatementContext,
2071 CopyStatement {
2072 relation,
2073 direction,
2074 target,
2075 options,
2076 }: CopyStatement<Aug>,
2077) -> Result<Plan, PlanError> {
2078 let options = CopyOptionExtracted::try_from(options)?;
2079 let format = options
2082 .format
2083 .as_ref()
2084 .map(|format| match format.to_lowercase().as_str() {
2085 "text" => Ok(CopyFormat::Text),
2086 "csv" => Ok(CopyFormat::Csv),
2087 "binary" => Ok(CopyFormat::Binary),
2088 "parquet" => Ok(CopyFormat::Parquet),
2089 _ => sql_bail!("unknown FORMAT: {}", format),
2090 })
2091 .transpose()?;
2092
2093 match (&direction, &target) {
2094 (CopyDirection::To, CopyTarget::Stdout) => {
2095 if options.delimiter.is_some() {
2096 sql_bail!("COPY TO does not support DELIMITER option yet");
2097 }
2098 if options.quote.is_some() {
2099 sql_bail!("COPY TO does not support QUOTE option yet");
2100 }
2101 if options.null.is_some() {
2102 sql_bail!("COPY TO does not support NULL option yet");
2103 }
2104 match relation {
2105 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2106 CopyRelation::Select(stmt) => Ok(plan_select(
2107 scx,
2108 stmt,
2109 &Params::empty(),
2110 Some(format.unwrap_or(CopyFormat::Text)),
2111 )?),
2112 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2113 scx,
2114 stmt,
2115 &Params::empty(),
2116 Some(format.unwrap_or(CopyFormat::Text)),
2117 )?),
2118 }
2119 }
2120 (CopyDirection::From, target) => match relation {
2121 CopyRelation::Named { name, columns } => plan_copy_from(
2122 scx,
2123 target,
2124 name,
2125 columns,
2126 format.unwrap_or(CopyFormat::Text),
2127 options,
2128 ),
2129 _ => sql_bail!("COPY FROM {} not supported", target),
2130 },
2131 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2132 if !scx.catalog.active_role_id().is_system() {
2137 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2138 }
2139
2140 let format = match format {
2141 Some(inner) => inner,
2142 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2143 };
2144
2145 let stmt = match relation {
2146 CopyRelation::Named { name, columns } => {
2147 if !columns.is_empty() {
2148 sql_bail!(
2150 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2151 );
2152 }
2153 let query = Query {
2155 ctes: CteBlock::empty(),
2156 body: SetExpr::Table(name),
2157 order_by: vec![],
2158 limit: None,
2159 offset: None,
2160 };
2161 SelectStatement { query, as_of: None }
2162 }
2163 CopyRelation::Select(stmt) => {
2164 if !stmt.query.order_by.is_empty() {
2165 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2166 }
2167 stmt
2168 }
2169 CopyRelation::Subscribe(_) => {
2170 sql_bail!("COPY {} {} not supported", direction, target)
2171 }
2172 };
2173
2174 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2175 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2176 }
2177 _ => sql_bail!("COPY {} {} not supported", direction, target),
2178 }
2179}