1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19use mz_arrow_util::builder::ArrowBuilder;
20use mz_expr::RowSetFinishing;
21use mz_expr::visit::Visit;
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_ore::str::separated;
25use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
26use mz_repr::adt::numeric::NumericMaxScale;
27use mz_repr::bytes::ByteSize;
28use mz_repr::explain::{ExplainConfig, ExplainFormat};
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
31use mz_sql_parser::ast::{
32 CteBlock, ExplainAnalyzeClusterStatement, ExplainAnalyzeComputationProperties,
33 ExplainAnalyzeComputationProperty, ExplainAnalyzeObjectStatement, ExplainAnalyzeProperty,
34 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36 SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49 UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{
70 self, DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF, ENABLE_COPY_FROM_REMOTE,
71};
72
73pub fn describe_insert(
80 scx: &StatementContext,
81 InsertStatement {
82 table_name,
83 columns,
84 source,
85 returning,
86 }: InsertStatement<Aug>,
87) -> Result<StatementDesc, PlanError> {
88 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
89 let desc = if returning.expr.is_empty() {
90 None
91 } else {
92 Some(returning.desc)
93 };
94 Ok(StatementDesc::new(desc))
95}
96
97pub fn plan_insert(
98 scx: &StatementContext,
99 InsertStatement {
100 table_name,
101 columns,
102 source,
103 returning,
104 }: InsertStatement<Aug>,
105 params: &Params,
106) -> Result<Plan, PlanError> {
107 let (id, mut expr, returning) =
108 query::plan_insert_query(scx, table_name, columns, source, returning)?;
109 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
110 let returning = returning
111 .expr
112 .into_iter()
113 .map(|mut expr| {
114 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
115 expr.lower_uncorrelated(scx.catalog.system_vars())
116 })
117 .collect::<Result<Vec<_>, _>>()?;
118
119 Ok(Plan::Insert(InsertPlan {
120 id,
121 values: expr,
122 returning,
123 }))
124}
125
126pub fn describe_delete(
127 scx: &StatementContext,
128 stmt: DeleteStatement<Aug>,
129) -> Result<StatementDesc, PlanError> {
130 query::plan_delete_query(scx, stmt)?;
131 Ok(StatementDesc::new(None))
132}
133
134pub fn plan_delete(
135 scx: &StatementContext,
136 stmt: DeleteStatement<Aug>,
137 params: &Params,
138) -> Result<Plan, PlanError> {
139 let rtw_plan = query::plan_delete_query(scx, stmt)?;
140 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
141}
142
143pub fn describe_update(
144 scx: &StatementContext,
145 stmt: UpdateStatement<Aug>,
146) -> Result<StatementDesc, PlanError> {
147 query::plan_update_query(scx, stmt)?;
148 Ok(StatementDesc::new(None))
149}
150
151pub fn plan_update(
152 scx: &StatementContext,
153 stmt: UpdateStatement<Aug>,
154 params: &Params,
155) -> Result<Plan, PlanError> {
156 let rtw_plan = query::plan_update_query(scx, stmt)?;
157 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
158}
159
160pub fn plan_read_then_write(
161 scx: &StatementContext,
162 kind: MutationKind,
163 params: &Params,
164 query::ReadThenWritePlan {
165 id,
166 mut selection,
167 finishing,
168 assignments,
169 }: query::ReadThenWritePlan,
170) -> Result<Plan, PlanError> {
171 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
172 let mut assignments_outer = BTreeMap::new();
173 for (idx, mut set) in assignments {
174 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
175 let set = set.lower_uncorrelated(scx.catalog.system_vars())?;
176 assignments_outer.insert(idx, set);
177 }
178
179 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
180 id,
181 selection,
182 finishing,
183 assignments: assignments_outer,
184 kind,
185 returning: Vec::new(),
186 }))
187}
188
189pub fn describe_select(
190 scx: &StatementContext,
191 stmt: SelectStatement<Aug>,
192) -> Result<StatementDesc, PlanError> {
193 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
194 return Ok(StatementDesc::new(Some(desc)));
195 }
196
197 let query::PlannedRootQuery { desc, .. } =
198 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
199 Ok(StatementDesc::new(Some(desc)))
200}
201
202pub fn plan_select(
203 scx: &StatementContext,
204 select: SelectStatement<Aug>,
205 params: &Params,
206 copy_to: Option<CopyFormat>,
207) -> Result<Plan, PlanError> {
208 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
209 return Ok(Plan::SideEffectingFunc(f));
210 }
211
212 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
213 Ok(Plan::Select(plan))
214}
215
216fn plan_select_inner(
217 scx: &StatementContext,
218 select: SelectStatement<Aug>,
219 params: &Params,
220 copy_to: Option<CopyFormat>,
221) -> Result<(SelectPlan, RelationDesc), PlanError> {
222 let when = query::plan_as_of(scx, select.as_of.clone())?;
223 let lifetime = QueryLifetime::OneShot;
224 let query::PlannedRootQuery {
225 mut expr,
226 desc,
227 finishing,
228 scope: _,
229 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
230 expr.bind_parameters(scx, lifetime, params)?;
231
232 expr.try_visit_mut_pre(&mut |expr| {
235 if let HirRelationExpr::TopK { offset, .. } = expr {
236 let offset_value = offset_into_value(offset.take())?;
237 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
238 }
239 Ok::<(), PlanError>(())
240 })?;
241 let limit = match finishing.limit {
251 None => None,
252 Some(mut limit) => {
253 limit.bind_parameters(scx, lifetime, params)?;
254 let Some(limit) = limit.as_literal() else {
256 sql_bail!(
257 "Top-level LIMIT must be a constant expression, got {}",
258 limit
259 )
260 };
261 match limit {
262 Datum::Null => None,
263 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
264 _ => {
265 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
266 sql_bail!("LIMIT must be a non-negative INT or NULL")
267 }
268 }
269 }
270 };
271 let offset = {
272 let mut offset = finishing.offset.clone();
273 offset.bind_parameters(scx, lifetime, params)?;
274 let offset = offset_into_value(offset.take())?;
275 offset
276 .try_into()
277 .expect("checked in offset_into_value that it is not negative")
278 };
279
280 if scx.is_feature_flag_enabled(&DISALLOW_UNMATERIALIZABLE_FUNCTIONS_AS_OF)
284 && select.as_of.is_some()
285 && expr.contains_unmaterializable_except_temporal()?
286 {
287 bail_unsupported!("unmaterializable function (except `mz_now`) in an AS OF query");
288 }
289
290 let plan = SelectPlan {
291 source: expr,
292 when,
293 finishing: RowSetFinishing {
294 limit,
295 offset,
296 project: finishing.project,
297 order_by: finishing.order_by,
298 },
299 copy_to,
300 select: Some(Box::new(select)),
301 };
302
303 Ok((plan, desc))
304}
305
306pub fn describe_explain_plan(
307 scx: &StatementContext,
308 explain: ExplainPlanStatement<Aug>,
309) -> Result<StatementDesc, PlanError> {
310 let mut relation_desc = RelationDesc::builder();
311
312 match explain.stage() {
313 ExplainStage::RawPlan => {
314 let name = "Raw Plan";
315 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316 }
317 ExplainStage::DecorrelatedPlan => {
318 let name = "Decorrelated Plan";
319 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320 }
321 ExplainStage::LocalPlan => {
322 let name = "Locally Optimized Plan";
323 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
324 }
325 ExplainStage::GlobalPlan => {
326 let name = "Optimized Plan";
327 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
328 }
329 ExplainStage::PhysicalPlan => {
330 let name = "Physical Plan";
331 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
332 }
333 ExplainStage::Trace => {
334 relation_desc = relation_desc
335 .with_column("Time", SqlScalarType::UInt64.nullable(false))
336 .with_column("Path", SqlScalarType::String.nullable(false))
337 .with_column("Plan", SqlScalarType::String.nullable(false));
338 }
339 ExplainStage::PlanInsights => {
340 let name = "Plan Insights";
341 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
342 }
343 };
344 let relation_desc = relation_desc.finish();
345
346 Ok(
347 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
348 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
349 _ => vec![],
350 }),
351 )
352}
353
354pub fn describe_explain_pushdown(
355 scx: &StatementContext,
356 statement: ExplainPushdownStatement<Aug>,
357) -> Result<StatementDesc, PlanError> {
358 let relation_desc = RelationDesc::builder()
359 .with_column("Source", SqlScalarType::String.nullable(false))
360 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
361 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
362 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
363 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
364 .finish();
365
366 Ok(
367 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
368 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
369 _ => vec![],
370 }),
371 )
372}
373
374pub fn describe_explain_analyze_object(
375 _scx: &StatementContext,
376 statement: ExplainAnalyzeObjectStatement<Aug>,
377) -> Result<StatementDesc, PlanError> {
378 if statement.as_sql {
379 let relation_desc = RelationDesc::builder()
380 .with_column("SQL", SqlScalarType::String.nullable(false))
381 .finish();
382 return Ok(StatementDesc::new(Some(relation_desc)));
383 }
384
385 match statement.properties {
386 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
387 properties,
388 skew,
389 }) => {
390 let mut relation_desc = RelationDesc::builder()
391 .with_column("operator", SqlScalarType::String.nullable(false));
392
393 if skew {
394 relation_desc =
395 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
396 }
397
398 let mut seen_properties = BTreeSet::new();
399 for property in properties {
400 if !seen_properties.insert(property) {
402 continue;
403 }
404
405 match property {
406 ExplainAnalyzeComputationProperty::Memory if skew => {
407 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
408 relation_desc = relation_desc
409 .with_column("memory_ratio", numeric.clone())
410 .with_column("worker_memory", SqlScalarType::String.nullable(true))
411 .with_column("avg_memory", SqlScalarType::String.nullable(true))
412 .with_column("total_memory", SqlScalarType::String.nullable(true))
413 .with_column("records_ratio", numeric.clone())
414 .with_column("worker_records", numeric.clone())
415 .with_column("avg_records", numeric.clone())
416 .with_column("total_records", numeric);
417 }
418 ExplainAnalyzeComputationProperty::Memory => {
419 relation_desc = relation_desc
420 .with_column("total_memory", SqlScalarType::String.nullable(true))
421 .with_column(
422 "total_records",
423 SqlScalarType::Numeric { max_scale: None }.nullable(true),
424 );
425 }
426 ExplainAnalyzeComputationProperty::Cpu => {
427 if skew {
428 relation_desc = relation_desc
429 .with_column(
430 "cpu_ratio",
431 SqlScalarType::Numeric { max_scale: None }.nullable(true),
432 )
433 .with_column(
434 "worker_elapsed",
435 SqlScalarType::Interval.nullable(true),
436 )
437 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
438 }
439 relation_desc = relation_desc
440 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
441 }
442 }
443 }
444
445 let relation_desc = relation_desc.finish();
446 Ok(StatementDesc::new(Some(relation_desc)))
447 }
448 ExplainAnalyzeProperty::Hints => {
449 let relation_desc = RelationDesc::builder()
450 .with_column("operator", SqlScalarType::String.nullable(true))
451 .with_column("levels", SqlScalarType::Int64.nullable(true))
452 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
453 .with_column("hint", SqlScalarType::Float64.nullable(true))
454 .with_column("savings", SqlScalarType::String.nullable(true))
455 .finish();
456 Ok(StatementDesc::new(Some(relation_desc)))
457 }
458 }
459}
460
461pub fn describe_explain_analyze_cluster(
462 _scx: &StatementContext,
463 statement: ExplainAnalyzeClusterStatement,
464) -> Result<StatementDesc, PlanError> {
465 if statement.as_sql {
466 let relation_desc = RelationDesc::builder()
467 .with_column("SQL", SqlScalarType::String.nullable(false))
468 .finish();
469 return Ok(StatementDesc::new(Some(relation_desc)));
470 }
471
472 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
473
474 let mut relation_desc = RelationDesc::builder()
475 .with_column("object", SqlScalarType::String.nullable(false))
476 .with_column("global_id", SqlScalarType::String.nullable(false));
477
478 if skew {
479 relation_desc =
480 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
481 }
482
483 let mut seen_properties = BTreeSet::new();
484 for property in properties {
485 if !seen_properties.insert(property) {
487 continue;
488 }
489
490 match property {
491 ExplainAnalyzeComputationProperty::Memory if skew => {
492 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
493 relation_desc = relation_desc
494 .with_column("max_operator_memory_ratio", numeric.clone())
495 .with_column("worker_memory", SqlScalarType::String.nullable(true))
496 .with_column("avg_memory", SqlScalarType::String.nullable(true))
497 .with_column("total_memory", SqlScalarType::String.nullable(true))
498 .with_column("max_operator_records_ratio", numeric.clone())
499 .with_column("worker_records", numeric.clone())
500 .with_column("avg_records", numeric.clone())
501 .with_column("total_records", numeric);
502 }
503 ExplainAnalyzeComputationProperty::Memory => {
504 relation_desc = relation_desc
505 .with_column("total_memory", SqlScalarType::String.nullable(true))
506 .with_column(
507 "total_records",
508 SqlScalarType::Numeric { max_scale: None }.nullable(true),
509 );
510 }
511 ExplainAnalyzeComputationProperty::Cpu if skew => {
512 relation_desc = relation_desc
513 .with_column(
514 "max_operator_cpu_ratio",
515 SqlScalarType::Numeric { max_scale: None }.nullable(true),
516 )
517 .with_column("worker_elapsed", SqlScalarType::Interval.nullable(true))
518 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true))
519 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
520 }
521 ExplainAnalyzeComputationProperty::Cpu => {
522 relation_desc = relation_desc
523 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
524 }
525 }
526 }
527
528 Ok(StatementDesc::new(Some(relation_desc.finish())))
529}
530
531pub fn describe_explain_timestamp(
532 scx: &StatementContext,
533 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
534) -> Result<StatementDesc, PlanError> {
535 let relation_desc = RelationDesc::builder()
536 .with_column("Timestamp", SqlScalarType::String.nullable(false))
537 .finish();
538
539 Ok(StatementDesc::new(Some(relation_desc))
540 .with_params(describe_select(scx, select)?.param_types))
541}
542
543pub fn describe_explain_schema(
544 _: &StatementContext,
545 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
546) -> Result<StatementDesc, PlanError> {
547 let relation_desc = RelationDesc::builder()
548 .with_column("Schema", SqlScalarType::String.nullable(false))
549 .finish();
550 Ok(StatementDesc::new(Some(relation_desc)))
551}
552
553generate_extracted_config!(
562 ExplainPlanOption,
563 (Arity, Option<bool>, Default(None)),
564 (Cardinality, bool, Default(false)),
565 (ColumnNames, bool, Default(false)),
566 (FilterPushdown, Option<bool>, Default(None)),
567 (HumanizedExpressions, Option<bool>, Default(None)),
568 (JoinImplementations, bool, Default(false)),
569 (Keys, bool, Default(false)),
570 (LinearChains, bool, Default(false)),
571 (NoFastPath, bool, Default(false)),
572 (NonNegative, bool, Default(false)),
573 (NoNotices, bool, Default(false)),
574 (NodeIdentifiers, bool, Default(false)),
575 (Raw, bool, Default(false)),
576 (RawPlans, bool, Default(false)),
577 (RawSyntax, bool, Default(false)),
578 (Redacted, bool, Default(false)),
579 (SubtreeSize, bool, Default(false)),
580 (Timing, bool, Default(false)),
581 (Types, bool, Default(false)),
582 (Equivalences, bool, Default(false)),
583 (ReoptimizeImportedViews, Option<bool>, Default(None)),
584 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
585 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
586 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
587 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
588 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
589 (
590 EnableProjectionPushdownAfterRelationCse,
591 Option<bool>,
592 Default(None)
593 )
594);
595
596impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
597 type Error = PlanError;
598
599 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
600 if v.raw {
603 v.raw_plans = true;
604 v.raw_syntax = true;
605 }
606
607 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
610
611 Ok(ExplainConfig {
612 arity: v.arity.unwrap_or(enable_on_prod),
613 cardinality: v.cardinality,
614 column_names: v.column_names,
615 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
616 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
617 join_impls: v.join_implementations,
618 keys: v.keys,
619 linear_chains: !v.raw_plans && v.linear_chains,
620 no_fast_path: v.no_fast_path,
621 no_notices: v.no_notices,
622 node_ids: v.node_identifiers,
623 non_negative: v.non_negative,
624 raw_plans: v.raw_plans,
625 raw_syntax: v.raw_syntax,
626 verbose_syntax: false,
627 redacted: v.redacted,
628 subtree_size: v.subtree_size,
629 equivalences: v.equivalences,
630 timing: v.timing,
631 types: v.types,
632 features: OptimizerFeatureOverrides {
634 enable_guard_subquery_tablefunc: Default::default(),
635 enable_eager_delta_joins: v.enable_eager_delta_joins,
636 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
637 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
638 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
639 enable_consolidate_after_union_negate: Default::default(),
640 enable_reduce_mfp_fusion: Default::default(),
641 enable_cardinality_estimates: Default::default(),
642 persist_fast_path_limit: Default::default(),
643 reoptimize_imported_views: v.reoptimize_imported_views,
644 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
645 enable_projection_pushdown_after_relation_cse: v
646 .enable_projection_pushdown_after_relation_cse,
647 enable_less_reduce_in_eqprop: Default::default(),
648 enable_dequadratic_eqprop_map: Default::default(),
649 enable_eq_classes_withholding_errors: Default::default(),
650 enable_fast_path_plan_insights: Default::default(),
651 enable_cast_elimination: Default::default(),
652 enable_case_literal_transform: Default::default(),
653 },
654 })
655 }
656}
657
658fn plan_explainee(
659 scx: &StatementContext,
660 explainee: Explainee<Aug>,
661 params: &Params,
662) -> Result<plan::Explainee, PlanError> {
663 use crate::plan::ExplaineeStatement;
664
665 let is_replan = matches!(
666 explainee,
667 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
668 );
669
670 let explainee = match explainee {
671 Explainee::View(name) | Explainee::ReplanView(name) => {
672 let item = scx.get_item_by_resolved_name(&name)?;
673 let item_type = item.item_type();
674 if item_type != CatalogItemType::View {
675 sql_bail!("Expected {name} to be a view, not a {item_type}");
676 }
677 match is_replan {
678 true => crate::plan::Explainee::ReplanView(item.id()),
679 false => crate::plan::Explainee::View(item.id()),
680 }
681 }
682 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
683 let item = scx.get_item_by_resolved_name(&name)?;
684 let item_type = item.item_type();
685 if item_type != CatalogItemType::MaterializedView {
686 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
687 }
688 match is_replan {
689 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
690 false => crate::plan::Explainee::MaterializedView(item.id()),
691 }
692 }
693 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
694 let item = scx.get_item_by_resolved_name(&name)?;
695 let item_type = item.item_type();
696 if item_type != CatalogItemType::Index {
697 sql_bail!("Expected {name} to be an index, not a {item_type}");
698 }
699 match is_replan {
700 true => crate::plan::Explainee::ReplanIndex(item.id()),
701 false => crate::plan::Explainee::Index(item.id()),
702 }
703 }
704 Explainee::Select(select, broken) => {
705 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
706 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
707 }
708 Explainee::CreateView(mut stmt, broken) => {
709 if stmt.if_exists != IfExistsBehavior::Skip {
710 stmt.if_exists = IfExistsBehavior::Skip;
715 } else {
716 sql_bail!(
717 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
718 (the behavior is implied within the scope of an enclosing EXPLAIN)"
719 );
720 }
721
722 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
723 sql_bail!("expected CreateViewPlan plan");
724 };
725
726 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
727 }
728 Explainee::CreateMaterializedView(mut stmt, broken) => {
729 if stmt.if_exists != IfExistsBehavior::Skip {
730 stmt.if_exists = IfExistsBehavior::Skip;
735 } else {
736 sql_bail!(
737 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
738 (the behavior is implied within the scope of an enclosing EXPLAIN)"
739 );
740 }
741
742 let Plan::CreateMaterializedView(plan) =
743 ddl::plan_create_materialized_view(scx, *stmt)?
744 else {
745 sql_bail!("expected CreateMaterializedViewPlan plan");
746 };
747
748 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
749 broken,
750 plan,
751 })
752 }
753 Explainee::CreateIndex(mut stmt, broken) => {
754 if !stmt.if_not_exists {
755 stmt.if_not_exists = true;
758 } else {
759 sql_bail!(
760 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
761 (the behavior is implied within the scope of an enclosing EXPLAIN)"
762 );
763 }
764
765 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
766 sql_bail!("expected CreateIndexPlan plan");
767 };
768
769 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
770 }
771 Explainee::Subscribe(stmt, broken) => {
772 let Plan::Subscribe(plan) = plan_subscribe(scx, *stmt, params, None)? else {
773 sql_bail!("expected SubscribePlan");
774 };
775 crate::plan::Explainee::Statement(ExplaineeStatement::Subscribe { broken, plan })
776 }
777 };
778
779 Ok(explainee)
780}
781
782pub fn plan_explain_plan(
783 scx: &StatementContext,
784 explain: ExplainPlanStatement<Aug>,
785 params: &Params,
786) -> Result<Plan, PlanError> {
787 let (format, verbose_syntax) = match explain.format() {
788 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
789 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
790 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
791 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
792 };
793 let stage = explain.stage();
794
795 let mut config = {
797 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
798
799 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
800 with_options.filter_pushdown = Some(false);
802 }
803
804 ExplainConfig::try_from(with_options)?
805 };
806 config.verbose_syntax = verbose_syntax;
807
808 let explainee = plan_explainee(scx, explain.explainee, params)?;
809
810 Ok(Plan::ExplainPlan(ExplainPlanPlan {
811 stage,
812 format,
813 config,
814 explainee,
815 }))
816}
817
818pub fn plan_explain_schema(
819 scx: &StatementContext,
820 explain_schema: ExplainSinkSchemaStatement<Aug>,
821) -> Result<Plan, PlanError> {
822 let ExplainSinkSchemaStatement {
823 schema_for,
824 format: _,
826 mut statement,
827 } = explain_schema;
828
829 statement.name = Some(UnresolvedItemName::qualified(&[
833 ident!("mz_catalog"),
834 ident!("mz_explain_schema"),
835 ]));
836
837 crate::pure::purify_create_sink_avro_doc_on_options(
838 scx.catalog,
839 *statement.from.item_id(),
840 &mut statement.format,
841 )?;
842
843 match ddl::plan_create_sink(scx, statement)? {
844 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
845 StorageSinkConnection::Kafka(KafkaSinkConnection {
846 format:
847 KafkaSinkFormat {
848 key_format,
849 value_format:
850 KafkaSinkFormatType::Avro {
851 schema: value_schema,
852 ..
853 },
854 ..
855 },
856 ..
857 }) => {
858 let schema = match schema_for {
859 ExplainSinkSchemaFor::Key => key_format
860 .and_then(|f| match f {
861 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
862 _ => None,
863 })
864 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
865 ExplainSinkSchemaFor::Value => value_schema,
866 };
867
868 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
869 sink_from: sink.from,
870 json_schema: schema,
871 }))
872 }
873 _ => bail_unsupported!(
874 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
875 ),
876 },
877 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
878 }
879}
880
881pub fn plan_explain_pushdown(
882 scx: &StatementContext,
883 statement: ExplainPushdownStatement<Aug>,
884 params: &Params,
885) -> Result<Plan, PlanError> {
886 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
887 let explainee = plan_explainee(scx, statement.explainee, params)?;
888 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
889}
890
891pub fn plan_explain_analyze_object(
892 scx: &StatementContext,
893 statement: ExplainAnalyzeObjectStatement<Aug>,
894 params: &Params,
895) -> Result<Plan, PlanError> {
896 let explainee_name = statement
897 .explainee
898 .name()
899 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
900 .full_name_str();
901 let explainee = plan_explainee(scx, statement.explainee, params)?;
902
903 match explainee {
904 plan::Explainee::Index(_index_id) => (),
905 plan::Explainee::MaterializedView(_item_id) => (),
906 _ => {
907 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
908 }
909 };
910
911 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
926 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
927 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
928 let mut order_by = vec!["mlm.lir_id DESC"];
929
930 match statement.properties {
931 ExplainAnalyzeProperty::Computation(ExplainAnalyzeComputationProperties {
932 properties,
933 skew,
934 }) => {
935 let mut worker_id = None;
936 let mut seen_properties = BTreeSet::new();
937 for property in properties {
938 if !seen_properties.insert(property) {
940 continue;
941 }
942
943 match property {
944 ExplainAnalyzeComputationProperty::Memory => {
945 ctes.push((
946 "summary_memory",
947 r#"
948 SELECT mlm.global_id AS global_id,
949 mlm.lir_id AS lir_id,
950 SUM(mas.size) AS total_memory,
951 SUM(mas.records) AS total_records,
952 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
953 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
954 FROM mz_introspection.mz_lir_mapping mlm
955 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
956 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
957 ON (mas.operator_id = valid_id)
958GROUP BY mlm.global_id, mlm.lir_id"#,
959 ));
960 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
961
962 if skew {
963 ctes.push((
964 "per_worker_memory",
965 r#"
966 SELECT mlm.global_id AS global_id,
967 mlm.lir_id AS lir_id,
968 mas.worker_id AS worker_id,
969 SUM(mas.size) AS worker_memory,
970 SUM(mas.records) AS worker_records
971 FROM mz_introspection.mz_lir_mapping mlm
972 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
973 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
974 ON (mas.operator_id = valid_id)
975GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
976 ));
977 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
978
979 if let Some(worker_id) = worker_id {
980 predicates.push(format!("pwm.worker_id = {worker_id}"));
981 } else {
982 worker_id = Some("pwm.worker_id");
983 columns.push("pwm.worker_id AS worker_id");
984 order_by.push("worker_id");
985 }
986
987 columns.extend([
988 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
989 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
990 "pg_size_pretty(sm.avg_memory) AS avg_memory",
991 "pg_size_pretty(sm.total_memory) AS total_memory",
992 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
993 "pwm.worker_records AS worker_records",
994 "sm.avg_records AS avg_records",
995 "sm.total_records AS total_records",
996 ]);
997 } else {
998 columns.extend([
999 "pg_size_pretty(sm.total_memory) AS total_memory",
1000 "sm.total_records AS total_records",
1001 ]);
1002 }
1003 }
1004 ExplainAnalyzeComputationProperty::Cpu => {
1005 ctes.push((
1006 "summary_cpu",
1007 r#"
1008 SELECT mlm.global_id AS global_id,
1009 mlm.lir_id AS lir_id,
1010 SUM(mse.elapsed_ns) AS total_ns,
1011 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1012 FROM mz_introspection.mz_lir_mapping mlm
1013 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1014 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1015 ON (mse.id = valid_id)
1016GROUP BY mlm.global_id, mlm.lir_id"#,
1017 ));
1018 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
1019
1020 if skew {
1021 ctes.push((
1022 "per_worker_cpu",
1023 r#"
1024 SELECT mlm.global_id AS global_id,
1025 mlm.lir_id AS lir_id,
1026 mse.worker_id AS worker_id,
1027 SUM(mse.elapsed_ns) AS worker_ns
1028 FROM mz_introspection.mz_lir_mapping mlm
1029 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1030 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1031 ON (mse.id = valid_id)
1032GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1033 ));
1034 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
1035
1036 if let Some(worker_id) = worker_id {
1037 predicates.push(format!("pwc.worker_id = {worker_id}"));
1038 } else {
1039 worker_id = Some("pwc.worker_id");
1040 columns.push("pwc.worker_id AS worker_id");
1041 order_by.push("worker_id");
1042 }
1043
1044 columns.extend([
1045 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
1046 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
1047 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
1048 ]);
1049 }
1050 columns.push(
1051 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
1052 );
1053 }
1054 }
1055 }
1056 }
1057 ExplainAnalyzeProperty::Hints => {
1058 columns.extend([
1059 "megsa.levels AS levels",
1060 "megsa.to_cut AS to_cut",
1061 "megsa.hint AS hint",
1062 "pg_size_pretty(megsa.savings) AS savings",
1063 ]);
1064 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
1065 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
1066 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
1067 }
1068 }
1069
1070 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
1071
1072 let ctes = if !ctes.is_empty() {
1073 format!(
1074 "WITH {}",
1075 separated(
1076 ",\n",
1077 ctes.iter()
1078 .map(|(name, defn)| format!("{name} AS ({defn})"))
1079 )
1080 )
1081 } else {
1082 String::new()
1083 };
1084 let columns = separated(", ", columns);
1085 let from = separated(" ", from);
1086 let predicates = separated(" AND ", predicates);
1087 let order_by = separated(", ", order_by);
1088 let query = format!(
1089 r#"{ctes}
1090SELECT {columns}
1091FROM {from}
1092WHERE {predicates}
1093ORDER BY {order_by}"#
1094 );
1095
1096 if statement.as_sql {
1097 let rows = vec![Row::pack_slice(&[Datum::String(
1098 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1099 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1100 })?,
1101 )])];
1102 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1103
1104 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1105 } else {
1106 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1107 show_select.plan()
1108 }
1109}
1110
1111pub fn plan_explain_analyze_cluster(
1112 scx: &StatementContext,
1113 statement: ExplainAnalyzeClusterStatement,
1114 _params: &Params,
1115) -> Result<Plan, PlanError> {
1116 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["mo.name AS object", "mo.global_id AS global_id"];
1141 let mut from = vec!["mz_introspection.mz_mappable_objects mo"];
1142 let mut predicates = vec![];
1143 let mut order_by = vec![];
1144
1145 let ExplainAnalyzeComputationProperties { properties, skew } = statement.properties;
1146 let mut worker_id = None;
1147 let mut seen_properties = BTreeSet::new();
1148 for property in properties {
1149 if !seen_properties.insert(property) {
1151 continue;
1152 }
1153
1154 match property {
1155 ExplainAnalyzeComputationProperty::Memory => {
1156 if skew {
1157 let mut set_worker_id = false;
1158 if let Some(worker_id) = worker_id {
1159 predicates.push(format!("om.worker_id = {worker_id}"));
1161 } else {
1162 worker_id = Some("om.worker_id");
1163 columns.push("om.worker_id AS worker_id");
1164 set_worker_id = true; };
1166
1167 ctes.push((
1169 "per_operator_memory_summary",
1170 r#"
1171SELECT mlm.global_id AS global_id,
1172 mlm.lir_id AS lir_id,
1173 SUM(mas.size) AS total_memory,
1174 SUM(mas.records) AS total_records,
1175 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
1176 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
1177FROM mz_introspection.mz_lir_mapping mlm
1178 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1179 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1180 ON (mas.operator_id = valid_id)
1181GROUP BY mlm.global_id, mlm.lir_id"#,
1182 ));
1183
1184 ctes.push((
1186 "per_operator_memory_per_worker",
1187 r#"
1188SELECT mlm.global_id AS global_id,
1189 mlm.lir_id AS lir_id,
1190 mas.worker_id AS worker_id,
1191 SUM(mas.size) AS worker_memory,
1192 SUM(mas.records) AS worker_records
1193FROM mz_introspection.mz_lir_mapping mlm
1194 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1195 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1196 ON (mas.operator_id = valid_id)
1197GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
1198 ));
1199
1200 ctes.push((
1202 "per_operator_memory_ratios",
1203 r#"
1204SELECT pompw.global_id AS global_id,
1205 pompw.lir_id AS lir_id,
1206 pompw.worker_id AS worker_id,
1207 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_memory <> 0 THEN ROUND(pompw.worker_memory / poms.avg_memory, 2) ELSE NULL END AS memory_ratio,
1208 CASE WHEN pompw.worker_id IS NOT NULL AND poms.avg_records <> 0 THEN ROUND(pompw.worker_records / poms.avg_records, 2) ELSE NULL END AS records_ratio
1209 FROM per_operator_memory_per_worker pompw
1210 JOIN per_operator_memory_summary poms
1211 USING (global_id, lir_id)
1212"#,
1213 ));
1214
1215 ctes.push((
1217 "object_memory",
1218 r#"
1219SELECT pompw.global_id AS global_id,
1220 pompw.worker_id AS worker_id,
1221 MAX(pomr.memory_ratio) AS max_operator_memory_ratio,
1222 MAX(pomr.records_ratio) AS max_operator_records_ratio,
1223 SUM(pompw.worker_memory) AS worker_memory,
1224 SUM(pompw.worker_records) AS worker_records
1225FROM per_operator_memory_per_worker pompw
1226 JOIN per_operator_memory_ratios pomr
1227 USING (global_id, worker_id, lir_id)
1228GROUP BY pompw.global_id, pompw.worker_id
1229"#,
1230 ));
1231
1232 ctes.push(("object_average_memory", r#"
1234SELECT om.global_id AS global_id,
1235 SUM(om.worker_memory) AS total_memory,
1236 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_memory) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_memory,
1237 SUM(om.worker_records) AS total_records,
1238 CASE WHEN COUNT(DISTINCT om.worker_id) <> 0 THEN SUM(om.worker_records) / COUNT(DISTINCT om.worker_id) ELSE NULL END AS avg_records
1239 FROM object_memory om
1240GROUP BY om.global_id"#));
1241
1242 from.push("LEFT JOIN object_memory om USING (global_id)");
1243 from.push("LEFT JOIN object_average_memory oam USING (global_id)");
1244
1245 columns.extend([
1246 "om.max_operator_memory_ratio AS max_operator_memory_ratio",
1247 "pg_size_pretty(om.worker_memory) AS worker_memory",
1248 "pg_size_pretty(oam.avg_memory) AS avg_memory",
1249 "pg_size_pretty(oam.total_memory) AS total_memory",
1250 "om.max_operator_records_ratio AS max_operator_records_ratio",
1251 "om.worker_records AS worker_records",
1252 "oam.avg_records AS avg_records",
1253 "oam.total_records AS total_records",
1254 ]);
1255
1256 order_by.extend([
1257 "max_operator_memory_ratio DESC",
1258 "max_operator_records_ratio DESC",
1259 "om.worker_memory DESC",
1260 "worker_records DESC",
1261 ]);
1262
1263 if set_worker_id {
1264 order_by.push("worker_id");
1265 }
1266 } else {
1267 ctes.push((
1269 "per_operator_memory_totals",
1270 r#"
1271 SELECT mlm.global_id AS global_id,
1272 mlm.lir_id AS lir_id,
1273 SUM(mas.size) AS total_memory,
1274 SUM(mas.records) AS total_records
1275 FROM mz_introspection.mz_lir_mapping mlm
1276 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1277 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
1278 ON (mas.operator_id = valid_id)
1279 GROUP BY mlm.global_id, mlm.lir_id"#,
1280 ));
1281
1282 ctes.push((
1283 "object_memory_totals",
1284 r#"
1285SELECT pomt.global_id AS global_id,
1286 SUM(pomt.total_memory) AS total_memory,
1287 SUM(pomt.total_records) AS total_records
1288FROM per_operator_memory_totals pomt
1289GROUP BY pomt.global_id
1290"#,
1291 ));
1292
1293 from.push("LEFT JOIN object_memory_totals omt USING (global_id)");
1294 columns.extend([
1295 "pg_size_pretty(omt.total_memory) AS total_memory",
1296 "omt.total_records AS total_records",
1297 ]);
1298 order_by.extend(["omt.total_memory DESC", "total_records DESC"]);
1299 }
1300 }
1301 ExplainAnalyzeComputationProperty::Cpu => {
1302 if skew {
1303 let mut set_worker_id = false;
1304 if let Some(worker_id) = worker_id {
1305 predicates.push(format!("oc.worker_id = {worker_id}"));
1307 } else {
1308 worker_id = Some("oc.worker_id");
1309 columns.push("oc.worker_id AS worker_id");
1310 set_worker_id = true; };
1312
1313 ctes.push((
1315 "per_operator_cpu_summary",
1316 r#"
1317SELECT mlm.global_id AS global_id,
1318 mlm.lir_id AS lir_id,
1319 SUM(mse.elapsed_ns) AS total_ns,
1320 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
1321FROM mz_introspection.mz_lir_mapping mlm
1322CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1323 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1324 ON (mse.id = valid_id)
1325GROUP BY mlm.global_id, mlm.lir_id"#,
1326));
1327
1328 ctes.push((
1330 "per_operator_cpu_per_worker",
1331 r#"
1332SELECT mlm.global_id AS global_id,
1333 mlm.lir_id AS lir_id,
1334 mse.worker_id AS worker_id,
1335 SUM(mse.elapsed_ns) AS worker_ns
1336FROM mz_introspection.mz_lir_mapping mlm
1337CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1338 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1339 ON (mse.id = valid_id)
1340GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
1341 ));
1342
1343 ctes.push((
1345 "per_operator_cpu_ratios",
1346 r#"
1347SELECT pocpw.global_id AS global_id,
1348 pocpw.lir_id AS lir_id,
1349 pocpw.worker_id AS worker_id,
1350 CASE WHEN pocpw.worker_id IS NOT NULL AND pocs.avg_ns <> 0 THEN ROUND(pocpw.worker_ns / pocs.avg_ns, 2) ELSE NULL END AS cpu_ratio
1351FROM per_operator_cpu_per_worker pocpw
1352 JOIN per_operator_cpu_summary pocs
1353 USING (global_id, lir_id)
1354"#,
1355 ));
1356
1357 ctes.push((
1359 "object_cpu",
1360 r#"
1361SELECT pocpw.global_id AS global_id,
1362 pocpw.worker_id AS worker_id,
1363 MAX(pomr.cpu_ratio) AS max_operator_cpu_ratio,
1364 SUM(pocpw.worker_ns) AS worker_ns
1365FROM per_operator_cpu_per_worker pocpw
1366 JOIN per_operator_cpu_ratios pomr
1367 USING (global_id, worker_id, lir_id)
1368GROUP BY pocpw.global_id, pocpw.worker_id
1369"#,
1370 ));
1371
1372 ctes.push((
1374 "object_average_cpu",
1375 r#"
1376SELECT oc.global_id AS global_id,
1377 SUM(oc.worker_ns) AS total_ns,
1378 CASE WHEN COUNT(DISTINCT oc.worker_id) <> 0 THEN SUM(oc.worker_ns) / COUNT(DISTINCT oc.worker_id) ELSE NULL END AS avg_ns
1379 FROM object_cpu oc
1380GROUP BY oc.global_id"#,));
1381
1382 from.push("LEFT JOIN object_cpu oc USING (global_id)");
1383 from.push("LEFT JOIN object_average_cpu oac USING (global_id)");
1384
1385 columns.extend([
1386 "oc.max_operator_cpu_ratio AS max_operator_cpu_ratio",
1387 "oc.worker_ns / 1000 * '1 microsecond'::interval AS worker_elapsed",
1388 "oac.avg_ns / 1000 * '1 microsecond'::interval AS avg_elapsed",
1389 "oac.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed",
1390 ]);
1391
1392 order_by.extend(["max_operator_cpu_ratio DESC", "worker_elapsed DESC"]);
1393
1394 if set_worker_id {
1395 order_by.push("worker_id");
1396 }
1397 } else {
1398 ctes.push((
1400 "per_operator_cpu_totals",
1401 r#"
1402 SELECT mlm.global_id AS global_id,
1403 mlm.lir_id AS lir_id,
1404 SUM(mse.elapsed_ns) AS total_ns
1405 FROM mz_introspection.mz_lir_mapping mlm
1406 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
1407 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
1408 ON (mse.id = valid_id)
1409 GROUP BY mlm.global_id, mlm.lir_id"#,
1410 ));
1411
1412 ctes.push((
1413 "object_cpu_totals",
1414 r#"
1415SELECT poct.global_id AS global_id,
1416 SUM(poct.total_ns) AS total_ns
1417FROM per_operator_cpu_totals poct
1418GROUP BY poct.global_id
1419"#,
1420 ));
1421
1422 from.push("LEFT JOIN object_cpu_totals oct USING (global_id)");
1423 columns
1424 .push("oct.total_ns / 1000 * '1 microsecond'::interval AS total_elapsed");
1425 order_by.extend(["total_elapsed DESC"]);
1426 }
1427 }
1428 }
1429 }
1430
1431 let ctes = if !ctes.is_empty() {
1433 format!(
1434 "WITH {}",
1435 separated(
1436 ",\n",
1437 ctes.iter()
1438 .map(|(name, defn)| format!("{name} AS ({defn})"))
1439 )
1440 )
1441 } else {
1442 String::new()
1443 };
1444 let columns = separated(", ", columns);
1445 let from = separated(" ", from);
1446 let predicates = if !predicates.is_empty() {
1447 format!("WHERE {}", separated(" AND ", predicates))
1448 } else {
1449 String::new()
1450 };
1451 order_by.push("mo.name DESC");
1453 let order_by = separated(", ", order_by);
1454 let query = format!(
1455 r#"{ctes}
1456SELECT {columns}
1457FROM {from}
1458{predicates}
1459ORDER BY {order_by}"#
1460 );
1461
1462 if statement.as_sql {
1463 let rows = vec![Row::pack_slice(&[Datum::String(
1464 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1465 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1466 })?,
1467 )])];
1468 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1469
1470 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1471 } else {
1472 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1473 show_select.plan()
1474 }
1475}
1476
1477pub fn plan_explain_timestamp(
1478 scx: &StatementContext,
1479 explain: ExplainTimestampStatement<Aug>,
1480) -> Result<Plan, PlanError> {
1481 let (format, _verbose_syntax) = match explain.format() {
1482 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1483 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1484 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1485 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1486 };
1487
1488 let raw_plan = {
1489 let query::PlannedRootQuery {
1490 expr: raw_plan,
1491 desc: _,
1492 finishing: _,
1493 scope: _,
1494 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1495 if raw_plan.contains_parameters()? {
1496 return Err(PlanError::ParameterNotAllowed(
1497 "EXPLAIN TIMESTAMP".to_string(),
1498 ));
1499 }
1500
1501 raw_plan
1502 };
1503 let when = query::plan_as_of(scx, explain.select.as_of)?;
1504
1505 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1506 format,
1507 raw_plan,
1508 when,
1509 }))
1510}
1511
1512generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1513
1514pub fn describe_subscribe(
1515 scx: &StatementContext,
1516 stmt: SubscribeStatement<Aug>,
1517) -> Result<StatementDesc, PlanError> {
1518 let relation_desc = match stmt.relation {
1519 SubscribeRelation::Name(name) => {
1520 let item = scx.get_item_by_resolved_name(&name)?;
1521 match item.relation_desc() {
1522 Some(desc) => desc.into_owned(),
1523 None => sql_bail!(
1524 "'{}' cannot be subscribed to because it is a {}",
1525 name.full_name_str(),
1526 item.item_type(),
1527 ),
1528 }
1529 }
1530 SubscribeRelation::Query(query) => {
1531 let query::PlannedRootQuery { desc, .. } =
1532 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1533 desc
1534 }
1535 };
1536 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1537 let progress = progress.unwrap_or(false);
1538 let mut desc = RelationDesc::builder().with_column(
1539 "mz_timestamp",
1540 SqlScalarType::Numeric {
1541 max_scale: Some(NumericMaxScale::ZERO),
1542 }
1543 .nullable(false),
1544 );
1545 if progress {
1546 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1547 }
1548
1549 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1550 match stmt.output {
1551 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1552 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1553 for (name, mut ty) in relation_desc.into_iter() {
1554 if progress {
1555 ty.nullable = true;
1556 }
1557 desc = desc.with_column(name, ty);
1558 }
1559 }
1560 SubscribeOutput::EnvelopeUpsert { key_columns }
1561 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1562 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1563 let key_columns = key_columns
1564 .into_iter()
1565 .map(normalize::column_name)
1566 .collect_vec();
1567 let mut before_values_desc = RelationDesc::builder();
1568 let mut after_values_desc = RelationDesc::builder();
1569
1570 for column_name in &key_columns {
1572 let mut column_ty = relation_desc
1573 .get_by_name(column_name)
1574 .map(|(_pos, ty)| ty.clone())
1575 .ok_or_else(|| PlanError::UnknownColumn {
1576 table: None,
1577 column: column_name.clone(),
1578 similar: Box::new([]),
1579 })?;
1580 if progress {
1581 column_ty.nullable = true;
1582 }
1583 desc = desc.with_column(column_name, column_ty);
1584 }
1585
1586 for (mut name, mut ty) in relation_desc
1589 .into_iter()
1590 .filter(|(name, _ty)| !key_columns.contains(name))
1591 {
1592 ty.nullable = true;
1593 before_values_desc =
1594 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1595 if debezium {
1596 name = format!("after_{}", name).into();
1597 }
1598 after_values_desc = after_values_desc.with_column(name, ty);
1599 }
1600
1601 if debezium {
1602 desc = desc.concat(before_values_desc);
1603 }
1604 desc = desc.concat(after_values_desc);
1605 }
1606 }
1607 Ok(StatementDesc::new(Some(desc.finish())))
1608}
1609
1610pub fn plan_subscribe(
1611 scx: &StatementContext,
1612 SubscribeStatement {
1613 relation,
1614 options,
1615 as_of,
1616 up_to,
1617 output,
1618 }: SubscribeStatement<Aug>,
1619 params: &Params,
1620 copy_to: Option<CopyFormat>,
1621) -> Result<Plan, PlanError> {
1622 let (from, desc, scope) = match relation {
1623 SubscribeRelation::Name(name) => {
1624 let item = scx.get_item_by_resolved_name(&name)?;
1625 let Some(desc) = item.relation_desc() else {
1626 sql_bail!(
1627 "'{}' cannot be subscribed to because it is a {}",
1628 name.full_name_str(),
1629 item.item_type(),
1630 );
1631 };
1632 let item_name = match name {
1633 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1634 _ => None,
1635 };
1636 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1637 (
1638 SubscribeFrom::Id(item.global_id()),
1639 desc.into_owned(),
1640 scope,
1641 )
1642 }
1643 SubscribeRelation::Query(query) => {
1644 #[allow(deprecated)] let query::PlannedRootQuery {
1646 mut expr,
1647 desc,
1648 finishing,
1649 scope,
1650 } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1651 expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?;
1652 let query = query::PlannedRootQuery {
1653 expr,
1654 desc,
1655 finishing,
1656 scope,
1657 };
1658 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1662 &query.finishing,
1663 query.desc.arity()
1664 ));
1665 let desc = query.desc.clone();
1666 (
1667 SubscribeFrom::Query {
1668 expr: query.expr,
1669 desc: query.desc,
1670 },
1671 desc,
1672 query.scope,
1673 )
1674 }
1675 };
1676
1677 let when = query::plan_as_of(scx, as_of)?;
1678 let up_to = up_to
1679 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1680 .transpose()?;
1681
1682 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1683 let ecx = ExprContext {
1684 qcx: &qcx,
1685 name: "",
1686 scope: &scope,
1687 relation_type: desc.typ(),
1688 allow_aggregates: false,
1689 allow_subqueries: true,
1690 allow_parameters: true,
1691 allow_windows: false,
1692 };
1693
1694 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1695 let output = match output {
1696 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1697 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1698 let order_by = key_columns
1699 .iter()
1700 .map(|ident| OrderByExpr {
1701 expr: Expr::Identifier(vec![ident.clone()]),
1702 asc: None,
1703 nulls_last: None,
1704 })
1705 .collect_vec();
1706 let (order_by, map_exprs) = query::plan_order_by_exprs(
1707 &ExprContext {
1708 name: "ENVELOPE UPSERT KEY clause",
1709 ..ecx
1710 },
1711 &order_by[..],
1712 &output_columns[..],
1713 )?;
1714 if !map_exprs.is_empty() {
1715 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1716 }
1717 plan::SubscribeOutput::EnvelopeUpsert {
1718 order_by_keys: order_by,
1719 }
1720 }
1721 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1722 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1723 let order_by = key_columns
1724 .iter()
1725 .map(|ident| OrderByExpr {
1726 expr: Expr::Identifier(vec![ident.clone()]),
1727 asc: None,
1728 nulls_last: None,
1729 })
1730 .collect_vec();
1731 let (order_by, map_exprs) = query::plan_order_by_exprs(
1732 &ExprContext {
1733 name: "ENVELOPE DEBEZIUM KEY clause",
1734 ..ecx
1735 },
1736 &order_by[..],
1737 &output_columns[..],
1738 )?;
1739 if !map_exprs.is_empty() {
1740 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1741 }
1742 plan::SubscribeOutput::EnvelopeDebezium {
1743 order_by_keys: order_by,
1744 }
1745 }
1746 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1747 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1748 let mz_diff = "mz_diff".into();
1749 let output_columns = std::iter::once((0, &mz_diff))
1750 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1751 .collect_vec();
1752 match query::plan_order_by_exprs(
1753 &ExprContext {
1754 name: "WITHIN TIMESTAMP ORDER BY clause",
1755 ..ecx
1756 },
1757 &order_by[..],
1758 &output_columns[..],
1759 ) {
1760 Err(PlanError::UnknownColumn {
1761 table: None,
1762 column,
1763 similar: _,
1764 }) if &column == &mz_diff => {
1765 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1768 }
1769 Err(e) => return Err(e),
1770 Ok((order_by, map_exprs)) => {
1771 if !map_exprs.is_empty() {
1772 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1773 }
1774
1775 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1776 }
1777 }
1778 }
1779 };
1780
1781 let SubscribeOptionExtracted {
1782 progress, snapshot, ..
1783 } = options.try_into()?;
1784 Ok(Plan::Subscribe(SubscribePlan {
1785 from,
1786 when,
1787 up_to,
1788 with_snapshot: snapshot.unwrap_or(true),
1789 copy_to,
1790 emit_progress: progress.unwrap_or(false),
1791 output,
1792 }))
1793}
1794
1795pub fn describe_copy_from_table(
1796 scx: &StatementContext,
1797 table_name: <Aug as AstInfo>::ItemName,
1798 columns: Vec<Ident>,
1799) -> Result<StatementDesc, PlanError> {
1800 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1801 Ok(StatementDesc::new(Some(desc)))
1802}
1803
1804pub fn describe_copy_item(
1805 scx: &StatementContext,
1806 object_name: <Aug as AstInfo>::ItemName,
1807 columns: Vec<Ident>,
1808) -> Result<StatementDesc, PlanError> {
1809 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1810 Ok(StatementDesc::new(Some(desc)))
1811}
1812
1813pub fn describe_copy(
1814 scx: &StatementContext,
1815 CopyStatement {
1816 relation,
1817 direction,
1818 ..
1819 }: CopyStatement<Aug>,
1820) -> Result<StatementDesc, PlanError> {
1821 Ok(match (relation, direction) {
1822 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1823 describe_copy_item(scx, name, columns)?
1824 }
1825 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1826 describe_copy_from_table(scx, name, columns)?
1827 }
1828 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1829 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1830 }
1831 .with_is_copy())
1832}
1833
1834fn plan_copy_to_expr(
1835 scx: &StatementContext,
1836 select_plan: SelectPlan,
1837 desc: RelationDesc,
1838 to: &Expr<Aug>,
1839 format: CopyFormat,
1840 options: CopyOptionExtracted,
1841) -> Result<Plan, PlanError> {
1842 let conn_id = match options.aws_connection {
1843 Some(conn_id) => CatalogItemId::from(conn_id),
1844 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1845 };
1846 let connection = scx.get_item(&conn_id).connection()?;
1847
1848 match connection {
1849 mz_storage_types::connections::Connection::Aws(_) => {}
1850 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1851 }
1852
1853 let format = match format {
1854 CopyFormat::Csv => {
1855 let quote = extract_byte_param_value(options.quote, "quote")?;
1856 let escape = extract_byte_param_value(options.escape, "escape")?;
1857 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1858 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1859 CopyCsvFormatParams::try_new(
1860 delimiter,
1861 quote,
1862 escape,
1863 options.header,
1864 options.null,
1865 )
1866 .map_err(|e| sql_err!("{}", e))?,
1867 ))
1868 }
1869 CopyFormat::Parquet => {
1870 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1872 S3SinkFormat::Parquet
1873 }
1874 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1875 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1876 };
1877
1878 let mut to_expr = to.clone();
1880 transform_ast::transform(scx, &mut to_expr)?;
1881 let relation_type = RelationDesc::empty();
1882 let ecx = &ExprContext {
1883 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1884 name: "COPY TO target",
1885 scope: &Scope::empty(),
1886 relation_type: relation_type.typ(),
1887 allow_aggregates: false,
1888 allow_subqueries: false,
1889 allow_parameters: false,
1890 allow_windows: false,
1891 };
1892
1893 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1894
1895 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1896 sql_bail!(
1897 "MAX FILE SIZE cannot be less than {}",
1898 MIN_S3_SINK_FILE_SIZE
1899 );
1900 }
1901 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1902 sql_bail!(
1903 "MAX FILE SIZE cannot be greater than {}",
1904 MAX_S3_SINK_FILE_SIZE
1905 );
1906 }
1907
1908 Ok(Plan::CopyTo(CopyToPlan {
1909 select_plan,
1910 desc,
1911 to,
1912 connection: connection.to_owned(),
1913 connection_id: conn_id,
1914 format,
1915 max_file_size: options.max_file_size.as_bytes(),
1916 }))
1917}
1918
1919fn plan_copy_from(
1920 scx: &StatementContext,
1921 target: &CopyTarget<Aug>,
1922 table_name: ResolvedItemName,
1923 columns: Vec<Ident>,
1924 format: CopyFormat,
1925 options: CopyOptionExtracted,
1926) -> Result<Plan, PlanError> {
1927 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1928 match option {
1929 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1930 None => Ok(()),
1931 }
1932 }
1933
1934 let source = match target {
1935 CopyTarget::Stdin => CopyFromSource::Stdin,
1936 CopyTarget::Expr(from) => {
1937 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1938
1939 let mut from_expr = from.clone();
1941 transform_ast::transform(scx, &mut from_expr)?;
1942 let relation_type = RelationDesc::empty();
1943 let ecx = &ExprContext {
1944 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1945 name: "COPY FROM target",
1946 scope: &Scope::empty(),
1947 relation_type: relation_type.typ(),
1948 allow_aggregates: false,
1949 allow_subqueries: false,
1950 allow_parameters: false,
1951 allow_windows: false,
1952 };
1953 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1954
1955 match options.aws_connection {
1956 Some(conn_id) => {
1957 let conn_id = CatalogItemId::from(conn_id);
1958
1959 let connection = match scx.get_item(&conn_id).connection()? {
1961 mz_storage_types::connections::Connection::Aws(conn) => conn,
1962 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1963 };
1964
1965 CopyFromSource::AwsS3 {
1966 uri: from,
1967 connection,
1968 connection_id: conn_id,
1969 }
1970 }
1971 None => CopyFromSource::Url(from),
1972 }
1973 }
1974 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1975 };
1976
1977 let params = match format {
1978 CopyFormat::Text => {
1979 only_available_with_csv(options.quote, "quote")?;
1980 only_available_with_csv(options.escape, "escape")?;
1981 only_available_with_csv(options.header, "HEADER")?;
1982 let delimiter =
1983 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1984 let null = match options.null {
1985 Some(null) => Cow::from(null),
1986 None => Cow::from("\\N"),
1987 };
1988 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1989 }
1990 CopyFormat::Csv => {
1991 let quote = extract_byte_param_value(options.quote, "quote")?;
1992 let escape = extract_byte_param_value(options.escape, "escape")?;
1993 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1994 CopyFormatParams::Csv(
1995 CopyCsvFormatParams::try_new(
1996 delimiter,
1997 quote,
1998 escape,
1999 options.header,
2000 options.null,
2001 )
2002 .map_err(|e| sql_err!("{}", e))?,
2003 )
2004 }
2005 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
2006 CopyFormat::Parquet => CopyFormatParams::Parquet,
2007 };
2008
2009 let filter = match (options.files, options.pattern) {
2010 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
2011 (Some(files), None) => Some(CopyFromFilter::Files(files)),
2012 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
2013 (None, None) => None,
2014 };
2015
2016 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
2017 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
2018 }
2019
2020 let table_name_string = table_name.full_name_str();
2021
2022 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
2023
2024 let Some(mfp) = maybe_mfp else {
2025 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
2026 };
2027
2028 Ok(Plan::CopyFrom(CopyFromPlan {
2029 target_id: id,
2030 target_name: table_name_string,
2031 source,
2032 columns,
2033 source_desc,
2034 mfp,
2035 params,
2036 filter,
2037 }))
2038}
2039
2040fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
2041 match v {
2042 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
2043 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
2044 None => Ok(None),
2045 }
2046}
2047
2048generate_extracted_config!(
2049 CopyOption,
2050 (Format, String),
2051 (Delimiter, String),
2052 (Null, String),
2053 (Escape, String),
2054 (Quote, String),
2055 (Header, bool),
2056 (AwsConnection, with_options::Object),
2057 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
2058 (Files, Vec<String>),
2059 (Pattern, String)
2060);
2061
2062pub fn plan_copy(
2063 scx: &StatementContext,
2064 CopyStatement {
2065 relation,
2066 direction,
2067 target,
2068 options,
2069 }: CopyStatement<Aug>,
2070) -> Result<Plan, PlanError> {
2071 let options = CopyOptionExtracted::try_from(options)?;
2072 let format = options
2075 .format
2076 .as_ref()
2077 .map(|format| match format.to_lowercase().as_str() {
2078 "text" => Ok(CopyFormat::Text),
2079 "csv" => Ok(CopyFormat::Csv),
2080 "binary" => Ok(CopyFormat::Binary),
2081 "parquet" => Ok(CopyFormat::Parquet),
2082 _ => sql_bail!("unknown FORMAT: {}", format),
2083 })
2084 .transpose()?;
2085
2086 match (&direction, &target) {
2087 (CopyDirection::To, CopyTarget::Stdout) => {
2088 if options.delimiter.is_some() {
2089 sql_bail!("COPY TO does not support DELIMITER option yet");
2090 }
2091 if options.quote.is_some() {
2092 sql_bail!("COPY TO does not support QUOTE option yet");
2093 }
2094 if options.null.is_some() {
2095 sql_bail!("COPY TO does not support NULL option yet");
2096 }
2097 match relation {
2098 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
2099 CopyRelation::Select(stmt) => Ok(plan_select(
2100 scx,
2101 stmt,
2102 &Params::empty(),
2103 Some(format.unwrap_or(CopyFormat::Text)),
2104 )?),
2105 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
2106 scx,
2107 stmt,
2108 &Params::empty(),
2109 Some(format.unwrap_or(CopyFormat::Text)),
2110 )?),
2111 }
2112 }
2113 (CopyDirection::From, target) => match relation {
2114 CopyRelation::Named { name, columns } => plan_copy_from(
2115 scx,
2116 target,
2117 name,
2118 columns,
2119 format.unwrap_or(CopyFormat::Text),
2120 options,
2121 ),
2122 _ => sql_bail!("COPY FROM {} not supported", target),
2123 },
2124 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
2125 if !scx.catalog.active_role_id().is_system() {
2130 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
2131 }
2132
2133 let format = match format {
2134 Some(inner) => inner,
2135 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
2136 };
2137
2138 let stmt = match relation {
2139 CopyRelation::Named { name, columns } => {
2140 if !columns.is_empty() {
2141 sql_bail!(
2143 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
2144 );
2145 }
2146 let query = Query {
2148 ctes: CteBlock::empty(),
2149 body: SetExpr::Table(name),
2150 order_by: vec![],
2151 limit: None,
2152 offset: None,
2153 };
2154 SelectStatement { query, as_of: None }
2155 }
2156 CopyRelation::Select(stmt) => {
2157 if !stmt.query.order_by.is_empty() {
2158 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
2159 }
2160 stmt
2161 }
2162 CopyRelation::Subscribe(_) => {
2163 sql_bail!("COPY {} {} not supported", direction, target)
2164 }
2165 };
2166
2167 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
2168 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
2169 }
2170 _ => sql_bail!("COPY {} {} not supported", direction, target),
2171 }
2172}