1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, RelationType, Row, ScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeComputationProperty, ExplainAnalyzeProperty, ExplainAnalyzeStatement,
34 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36 SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49 UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{ExprContext, QueryLifetime, offset_into_value, plan_expr, plan_up_to};
55use crate::plan::scope::Scope;
56use crate::plan::statement::show::ShowSelect;
57use crate::plan::statement::{StatementContext, StatementDesc, ddl};
58use crate::plan::{
59 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
60 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
61};
62use crate::plan::{
63 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
64 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
65};
66use crate::plan::{CopyFromSource, with_options};
67use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
68
69pub fn describe_insert(
76 scx: &StatementContext,
77 InsertStatement {
78 table_name,
79 columns,
80 source,
81 returning,
82 }: InsertStatement<Aug>,
83) -> Result<StatementDesc, PlanError> {
84 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
85 let desc = if returning.expr.is_empty() {
86 None
87 } else {
88 Some(returning.desc)
89 };
90 Ok(StatementDesc::new(desc))
91}
92
93pub fn plan_insert(
94 scx: &StatementContext,
95 InsertStatement {
96 table_name,
97 columns,
98 source,
99 returning,
100 }: InsertStatement<Aug>,
101 params: &Params,
102) -> Result<Plan, PlanError> {
103 let (id, mut expr, returning) =
104 query::plan_insert_query(scx, table_name, columns, source, returning)?;
105 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
106 let returning = returning
107 .expr
108 .into_iter()
109 .map(|mut expr| {
110 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
111 expr.lower_uncorrelated()
112 })
113 .collect::<Result<Vec<_>, _>>()?;
114
115 Ok(Plan::Insert(InsertPlan {
116 id,
117 values: expr,
118 returning,
119 }))
120}
121
122pub fn describe_delete(
123 scx: &StatementContext,
124 stmt: DeleteStatement<Aug>,
125) -> Result<StatementDesc, PlanError> {
126 query::plan_delete_query(scx, stmt)?;
127 Ok(StatementDesc::new(None))
128}
129
130pub fn plan_delete(
131 scx: &StatementContext,
132 stmt: DeleteStatement<Aug>,
133 params: &Params,
134) -> Result<Plan, PlanError> {
135 let rtw_plan = query::plan_delete_query(scx, stmt)?;
136 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
137}
138
139pub fn describe_update(
140 scx: &StatementContext,
141 stmt: UpdateStatement<Aug>,
142) -> Result<StatementDesc, PlanError> {
143 query::plan_update_query(scx, stmt)?;
144 Ok(StatementDesc::new(None))
145}
146
147pub fn plan_update(
148 scx: &StatementContext,
149 stmt: UpdateStatement<Aug>,
150 params: &Params,
151) -> Result<Plan, PlanError> {
152 let rtw_plan = query::plan_update_query(scx, stmt)?;
153 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
154}
155
156pub fn plan_read_then_write(
157 scx: &StatementContext,
158 kind: MutationKind,
159 params: &Params,
160 query::ReadThenWritePlan {
161 id,
162 mut selection,
163 finishing,
164 assignments,
165 }: query::ReadThenWritePlan,
166) -> Result<Plan, PlanError> {
167 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
168 let mut assignments_outer = BTreeMap::new();
169 for (idx, mut set) in assignments {
170 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
171 let set = set.lower_uncorrelated()?;
172 assignments_outer.insert(idx, set);
173 }
174
175 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
176 id,
177 selection,
178 finishing,
179 assignments: assignments_outer,
180 kind,
181 returning: Vec::new(),
182 }))
183}
184
185pub fn describe_select(
186 scx: &StatementContext,
187 stmt: SelectStatement<Aug>,
188) -> Result<StatementDesc, PlanError> {
189 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
190 return Ok(StatementDesc::new(Some(desc)));
191 }
192
193 let query::PlannedRootQuery { desc, .. } =
194 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
195 Ok(StatementDesc::new(Some(desc)))
196}
197
198pub fn plan_select(
199 scx: &StatementContext,
200 select: SelectStatement<Aug>,
201 params: &Params,
202 copy_to: Option<CopyFormat>,
203) -> Result<Plan, PlanError> {
204 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
205 return Ok(Plan::SideEffectingFunc(f));
206 }
207
208 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
209 Ok(Plan::Select(plan))
210}
211
212fn plan_select_inner(
213 scx: &StatementContext,
214 select: SelectStatement<Aug>,
215 params: &Params,
216 copy_to: Option<CopyFormat>,
217) -> Result<(SelectPlan, RelationDesc), PlanError> {
218 let when = query::plan_as_of(scx, select.as_of.clone())?;
219 let lifetime = QueryLifetime::OneShot;
220 let query::PlannedRootQuery {
221 mut expr,
222 desc,
223 finishing,
224 scope: _,
225 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
226 expr.bind_parameters(scx, lifetime, params)?;
227
228 expr.try_visit_mut_pre(&mut |expr| {
231 if let HirRelationExpr::TopK { offset, .. } = expr {
232 let offset_value = offset_into_value(offset.take())?;
233 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), ScalarType::Int64);
234 }
235 Ok::<(), PlanError>(())
236 })?;
237 let limit = match finishing.limit {
247 None => None,
248 Some(mut limit) => {
249 limit.bind_parameters(scx, lifetime, params)?;
250 let Some(limit) = limit.as_literal() else {
252 sql_bail!(
253 "Top-level LIMIT must be a constant expression, got {}",
254 limit
255 )
256 };
257 match limit {
258 Datum::Null => None,
259 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
260 _ => {
261 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
262 sql_bail!("LIMIT must be a non-negative INT or NULL")
263 }
264 }
265 }
266 };
267 let offset = {
268 let mut offset = finishing.offset.clone();
269 offset.bind_parameters(scx, lifetime, params)?;
270 let offset = offset_into_value(offset.take())?;
271 offset
272 .try_into()
273 .expect("checked in offset_into_value that it is not negative")
274 };
275
276 let plan = SelectPlan {
277 source: expr,
278 when,
279 finishing: RowSetFinishing {
280 limit,
281 offset,
282 project: finishing.project,
283 order_by: finishing.order_by,
284 },
285 copy_to,
286 select: Some(Box::new(select)),
287 };
288
289 Ok((plan, desc))
290}
291
292pub fn describe_explain_plan(
293 scx: &StatementContext,
294 explain: ExplainPlanStatement<Aug>,
295) -> Result<StatementDesc, PlanError> {
296 let mut relation_desc = RelationDesc::builder();
297
298 match explain.stage() {
299 ExplainStage::RawPlan => {
300 let name = "Raw Plan";
301 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
302 }
303 ExplainStage::DecorrelatedPlan => {
304 let name = "Decorrelated Plan";
305 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
306 }
307 ExplainStage::LocalPlan => {
308 let name = "Locally Optimized Plan";
309 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
310 }
311 ExplainStage::GlobalPlan => {
312 let name = "Optimized Plan";
313 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
314 }
315 ExplainStage::PhysicalPlan => {
316 let name = "Physical Plan";
317 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
318 }
319 ExplainStage::Trace => {
320 relation_desc = relation_desc
321 .with_column("Time", ScalarType::UInt64.nullable(false))
322 .with_column("Path", ScalarType::String.nullable(false))
323 .with_column("Plan", ScalarType::String.nullable(false));
324 }
325 ExplainStage::PlanInsights => {
326 let name = "Plan Insights";
327 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
328 }
329 };
330 let relation_desc = relation_desc.finish();
331
332 Ok(
333 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
334 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
335 _ => vec![],
336 }),
337 )
338}
339
340pub fn describe_explain_pushdown(
341 scx: &StatementContext,
342 statement: ExplainPushdownStatement<Aug>,
343) -> Result<StatementDesc, PlanError> {
344 let relation_desc = RelationDesc::builder()
345 .with_column("Source", ScalarType::String.nullable(false))
346 .with_column("Total Bytes", ScalarType::UInt64.nullable(false))
347 .with_column("Selected Bytes", ScalarType::UInt64.nullable(false))
348 .with_column("Total Parts", ScalarType::UInt64.nullable(false))
349 .with_column("Selected Parts", ScalarType::UInt64.nullable(false))
350 .finish();
351
352 Ok(
353 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
354 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
355 _ => vec![],
356 }),
357 )
358}
359
360pub fn describe_explain_analyze(
361 _scx: &StatementContext,
362 statement: ExplainAnalyzeStatement<Aug>,
363) -> Result<StatementDesc, PlanError> {
364 if statement.as_sql {
365 let relation_desc = RelationDesc::builder()
366 .with_column("SQL", ScalarType::String.nullable(false))
367 .finish();
368 return Ok(StatementDesc::new(Some(relation_desc)));
369 }
370
371 match statement.properties {
372 ExplainAnalyzeProperty::Computation { properties, skew } => {
373 let mut relation_desc =
374 RelationDesc::builder().with_column("operator", ScalarType::String.nullable(false));
375
376 if skew {
377 relation_desc =
378 relation_desc.with_column("worker_id", ScalarType::UInt64.nullable(true));
379 }
380
381 let mut seen_properties = BTreeSet::new();
382 for property in properties {
383 if !seen_properties.insert(property) {
385 continue;
386 }
387
388 match property {
389 ExplainAnalyzeComputationProperty::Memory if skew => {
390 let numeric = ScalarType::Numeric { max_scale: None }.nullable(true);
391 relation_desc = relation_desc
392 .with_column("memory_ratio", numeric.clone())
393 .with_column("worker_memory", ScalarType::String.nullable(true))
394 .with_column("avg_memory", ScalarType::String.nullable(true))
395 .with_column("total_memory", ScalarType::String.nullable(true))
396 .with_column("records_ratio", numeric.clone())
397 .with_column("worker_records", numeric.clone())
398 .with_column("avg_records", numeric.clone())
399 .with_column("total_records", numeric);
400 }
401 ExplainAnalyzeComputationProperty::Memory => {
402 relation_desc = relation_desc
403 .with_column("total_memory", ScalarType::String.nullable(true))
404 .with_column(
405 "total_records",
406 ScalarType::Numeric { max_scale: None }.nullable(true),
407 );
408 }
409 ExplainAnalyzeComputationProperty::Cpu => {
410 if skew {
411 relation_desc = relation_desc
412 .with_column(
413 "cpu_ratio",
414 ScalarType::Numeric { max_scale: None }.nullable(true),
415 )
416 .with_column("worker_elapsed", ScalarType::Interval.nullable(true))
417 .with_column("avg_elapsed", ScalarType::Interval.nullable(true));
418 }
419 relation_desc = relation_desc
420 .with_column("total_elapsed", ScalarType::Interval.nullable(true));
421 }
422 }
423 }
424
425 let relation_desc = relation_desc.finish();
426 Ok(StatementDesc::new(Some(relation_desc)))
427 }
428 ExplainAnalyzeProperty::Hints => {
429 let relation_desc = RelationDesc::builder()
430 .with_column("operator", ScalarType::String.nullable(true))
431 .with_column("levels", ScalarType::Int64.nullable(true))
432 .with_column("to_cut", ScalarType::Int64.nullable(true))
433 .with_column("hint", ScalarType::Float64.nullable(true))
434 .with_column("savings", ScalarType::String.nullable(true))
435 .finish();
436 Ok(StatementDesc::new(Some(relation_desc)))
437 }
438 }
439}
440
441pub fn describe_explain_timestamp(
442 scx: &StatementContext,
443 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
444) -> Result<StatementDesc, PlanError> {
445 let relation_desc = RelationDesc::builder()
446 .with_column("Timestamp", ScalarType::String.nullable(false))
447 .finish();
448
449 Ok(StatementDesc::new(Some(relation_desc))
450 .with_params(describe_select(scx, select)?.param_types))
451}
452
453pub fn describe_explain_schema(
454 _: &StatementContext,
455 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
456) -> Result<StatementDesc, PlanError> {
457 let relation_desc = RelationDesc::builder()
458 .with_column("Schema", ScalarType::String.nullable(false))
459 .finish();
460 Ok(StatementDesc::new(Some(relation_desc)))
461}
462
463generate_extracted_config!(
472 ExplainPlanOption,
473 (Arity, Option<bool>, Default(None)),
474 (Cardinality, bool, Default(false)),
475 (ColumnNames, bool, Default(false)),
476 (FilterPushdown, Option<bool>, Default(None)),
477 (HumanizedExpressions, Option<bool>, Default(None)),
478 (JoinImplementations, bool, Default(false)),
479 (Keys, bool, Default(false)),
480 (LinearChains, bool, Default(false)),
481 (NoFastPath, bool, Default(false)),
482 (NonNegative, bool, Default(false)),
483 (NoNotices, bool, Default(false)),
484 (NodeIdentifiers, bool, Default(false)),
485 (Raw, bool, Default(false)),
486 (RawPlans, bool, Default(false)),
487 (RawSyntax, bool, Default(false)),
488 (Redacted, bool, Default(false)),
489 (SubtreeSize, bool, Default(false)),
490 (Timing, bool, Default(false)),
491 (Types, bool, Default(false)),
492 (Equivalences, bool, Default(false)),
493 (ReoptimizeImportedViews, Option<bool>, Default(None)),
494 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
495 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
496 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
497 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
498 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
499 (
500 EnableProjectionPushdownAfterRelationCse,
501 Option<bool>,
502 Default(None)
503 )
504);
505
506impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
507 type Error = PlanError;
508
509 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
510 if v.raw {
513 v.raw_plans = true;
514 v.raw_syntax = true;
515 }
516
517 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
520
521 Ok(ExplainConfig {
522 arity: v.arity.unwrap_or(enable_on_prod),
523 cardinality: v.cardinality,
524 column_names: v.column_names,
525 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
526 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
527 join_impls: v.join_implementations,
528 keys: v.keys,
529 linear_chains: !v.raw_plans && v.linear_chains,
530 no_fast_path: v.no_fast_path,
531 no_notices: v.no_notices,
532 node_ids: v.node_identifiers,
533 non_negative: v.non_negative,
534 raw_plans: v.raw_plans,
535 raw_syntax: v.raw_syntax,
536 verbose_syntax: false,
537 redacted: v.redacted,
538 subtree_size: v.subtree_size,
539 equivalences: v.equivalences,
540 timing: v.timing,
541 types: v.types,
542 features: OptimizerFeatureOverrides {
544 enable_guard_subquery_tablefunc: Default::default(),
545 enable_eager_delta_joins: v.enable_eager_delta_joins,
546 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
547 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
548 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
549 enable_consolidate_after_union_negate: Default::default(),
550 enable_reduce_mfp_fusion: Default::default(),
551 enable_cardinality_estimates: Default::default(),
552 persist_fast_path_limit: Default::default(),
553 reoptimize_imported_views: v.reoptimize_imported_views,
554 enable_reduce_reduction: Default::default(),
555 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
556 enable_projection_pushdown_after_relation_cse: v
557 .enable_projection_pushdown_after_relation_cse,
558 enable_less_reduce_in_eqprop: Default::default(),
559 enable_dequadratic_eqprop_map: Default::default(),
560 enable_eq_classes_withholding_errors: Default::default(),
561 enable_fast_path_plan_insights: Default::default(),
562 },
563 })
564 }
565}
566
567fn plan_explainee(
568 scx: &StatementContext,
569 explainee: Explainee<Aug>,
570 params: &Params,
571) -> Result<plan::Explainee, PlanError> {
572 use crate::plan::ExplaineeStatement;
573
574 let is_replan = matches!(
575 explainee,
576 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
577 );
578
579 let explainee = match explainee {
580 Explainee::View(name) | Explainee::ReplanView(name) => {
581 let item = scx.get_item_by_resolved_name(&name)?;
582 let item_type = item.item_type();
583 if item_type != CatalogItemType::View {
584 sql_bail!("Expected {name} to be a view, not a {item_type}");
585 }
586 match is_replan {
587 true => crate::plan::Explainee::ReplanView(item.id()),
588 false => crate::plan::Explainee::View(item.id()),
589 }
590 }
591 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
592 let item = scx.get_item_by_resolved_name(&name)?;
593 let item_type = item.item_type();
594 if item_type != CatalogItemType::MaterializedView {
595 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
596 }
597 match is_replan {
598 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
599 false => crate::plan::Explainee::MaterializedView(item.id()),
600 }
601 }
602 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
603 let item = scx.get_item_by_resolved_name(&name)?;
604 let item_type = item.item_type();
605 if item_type != CatalogItemType::Index {
606 sql_bail!("Expected {name} to be an index, not a {item_type}");
607 }
608 match is_replan {
609 true => crate::plan::Explainee::ReplanIndex(item.id()),
610 false => crate::plan::Explainee::Index(item.id()),
611 }
612 }
613 Explainee::Select(select, broken) => {
614 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
615 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
616 }
617 Explainee::CreateView(mut stmt, broken) => {
618 if stmt.if_exists != IfExistsBehavior::Skip {
619 stmt.if_exists = IfExistsBehavior::Skip;
624 } else {
625 sql_bail!(
626 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
627 (the behavior is implied within the scope of an enclosing EXPLAIN)"
628 );
629 }
630
631 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
632 sql_bail!("expected CreateViewPlan plan");
633 };
634
635 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
636 }
637 Explainee::CreateMaterializedView(mut stmt, broken) => {
638 if stmt.if_exists != IfExistsBehavior::Skip {
639 stmt.if_exists = IfExistsBehavior::Skip;
644 } else {
645 sql_bail!(
646 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
647 (the behavior is implied within the scope of an enclosing EXPLAIN)"
648 );
649 }
650
651 let Plan::CreateMaterializedView(plan) =
652 ddl::plan_create_materialized_view(scx, *stmt)?
653 else {
654 sql_bail!("expected CreateMaterializedViewPlan plan");
655 };
656
657 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
658 broken,
659 plan,
660 })
661 }
662 Explainee::CreateIndex(mut stmt, broken) => {
663 if !stmt.if_not_exists {
664 stmt.if_not_exists = true;
667 } else {
668 sql_bail!(
669 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
670 (the behavior is implied within the scope of an enclosing EXPLAIN)"
671 );
672 }
673
674 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
675 sql_bail!("expected CreateIndexPlan plan");
676 };
677
678 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
679 }
680 };
681
682 Ok(explainee)
683}
684
685pub fn plan_explain_plan(
686 scx: &StatementContext,
687 explain: ExplainPlanStatement<Aug>,
688 params: &Params,
689) -> Result<Plan, PlanError> {
690 let (format, verbose_syntax) = match explain.format() {
691 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
692 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
693 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
694 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
695 };
696 let stage = explain.stage();
697
698 let mut config = {
700 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
701
702 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
703 with_options.filter_pushdown = Some(false);
705 }
706
707 ExplainConfig::try_from(with_options)?
708 };
709 config.verbose_syntax = verbose_syntax;
710
711 let explainee = plan_explainee(scx, explain.explainee, params)?;
712
713 Ok(Plan::ExplainPlan(ExplainPlanPlan {
714 stage,
715 format,
716 config,
717 explainee,
718 }))
719}
720
721pub fn plan_explain_schema(
722 scx: &StatementContext,
723 explain_schema: ExplainSinkSchemaStatement<Aug>,
724) -> Result<Plan, PlanError> {
725 let ExplainSinkSchemaStatement {
726 schema_for,
727 format: _,
729 mut statement,
730 } = explain_schema;
731
732 statement.name = Some(UnresolvedItemName::qualified(&[
736 ident!("mz_catalog"),
737 ident!("mz_explain_schema"),
738 ]));
739
740 crate::pure::purify_create_sink_avro_doc_on_options(
741 scx.catalog,
742 *statement.from.item_id(),
743 &mut statement.format,
744 )?;
745
746 match ddl::plan_create_sink(scx, statement)? {
747 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
748 StorageSinkConnection::Kafka(KafkaSinkConnection {
749 format:
750 KafkaSinkFormat {
751 key_format,
752 value_format:
753 KafkaSinkFormatType::Avro {
754 schema: value_schema,
755 ..
756 },
757 ..
758 },
759 ..
760 }) => {
761 let schema = match schema_for {
762 ExplainSinkSchemaFor::Key => key_format
763 .and_then(|f| match f {
764 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
765 _ => None,
766 })
767 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
768 ExplainSinkSchemaFor::Value => value_schema,
769 };
770
771 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
772 sink_from: sink.from,
773 json_schema: schema,
774 }))
775 }
776 _ => bail_unsupported!(
777 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
778 ),
779 },
780 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
781 }
782}
783
784pub fn plan_explain_pushdown(
785 scx: &StatementContext,
786 statement: ExplainPushdownStatement<Aug>,
787 params: &Params,
788) -> Result<Plan, PlanError> {
789 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
790 let explainee = plan_explainee(scx, statement.explainee, params)?;
791 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
792}
793
794pub fn plan_explain_analyze(
795 scx: &StatementContext,
796 statement: ExplainAnalyzeStatement<Aug>,
797 params: &Params,
798) -> Result<Plan, PlanError> {
799 let explainee_name = statement
800 .explainee
801 .name()
802 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
803 .full_name_str();
804 let explainee = plan_explainee(scx, statement.explainee, params)?;
805
806 match explainee {
807 plan::Explainee::Index(_index_id) => (),
808 plan::Explainee::MaterializedView(_item_id) => (),
809 _ => {
810 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
811 }
812 };
813
814 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
829 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
830 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
831 let mut order_by = vec!["mlm.lir_id DESC"];
832
833 match statement.properties {
834 ExplainAnalyzeProperty::Computation { properties, skew } => {
835 let mut worker_id = None;
836 let mut seen_properties = BTreeSet::new();
837 for property in properties {
838 if !seen_properties.insert(property) {
840 continue;
841 }
842
843 match property {
844 ExplainAnalyzeComputationProperty::Memory => {
845 ctes.push((
846 "summary_memory",
847 r#"
848 SELECT mlm.global_id AS global_id,
849 mlm.lir_id AS lir_id,
850 SUM(mas.size) AS total_memory,
851 SUM(mas.records) AS total_records,
852 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
853 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
854 FROM mz_introspection.mz_lir_mapping mlm
855 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
856 ON (mlm.operator_id_start <= mas.operator_id AND mas.operator_id < mlm.operator_id_end)
857GROUP BY mlm.global_id, mlm.lir_id"#,
858 ));
859 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
860
861 if skew {
862 ctes.push((
863 "per_worker_memory",
864 r#"
865 SELECT mlm.global_id AS global_id,
866 mlm.lir_id AS lir_id,
867 mas.worker_id AS worker_id,
868 SUM(mas.size) AS worker_memory,
869 SUM(mas.records) AS worker_records
870 FROM mz_introspection.mz_lir_mapping mlm
871 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
872 ON (mlm.operator_id_start <= mas.operator_id AND mas.operator_id < mlm.operator_id_end)
873GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
874 ));
875 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
876
877 if let Some(worker_id) = worker_id {
878 predicates.push(format!("pwm.worker_id = {worker_id}"));
879 } else {
880 worker_id = Some("pwm.worker_id");
881 columns.push("pwm.worker_id AS worker_id");
882 order_by.push("worker_id");
883 }
884
885 columns.extend([
886 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
887 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
888 "pg_size_pretty(sm.avg_memory) AS avg_memory",
889 "pg_size_pretty(sm.total_memory) AS total_memory",
890 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
891 "pwm.worker_records AS worker_records",
892 "sm.avg_records AS avg_records",
893 "sm.total_records AS total_records",
894 ]);
895 } else {
896 columns.extend([
897 "pg_size_pretty(sm.total_memory) AS total_memory",
898 "sm.total_records AS total_records",
899 ]);
900 }
901 }
902 ExplainAnalyzeComputationProperty::Cpu => {
903 ctes.push((
904 "summary_cpu",
905 r#"
906 SELECT mlm.global_id AS global_id,
907 mlm.lir_id AS lir_id,
908 SUM(mse.elapsed_ns) AS total_ns,
909 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
910 FROM mz_introspection.mz_lir_mapping mlm
911 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
912 ON (mlm.operator_id_start <= mse.id AND mse.id < mlm.operator_id_end)
913GROUP BY mlm.global_id, mlm.lir_id"#,
914 ));
915 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
916
917 if skew {
918 ctes.push((
919 "per_worker_cpu",
920 r#"
921 SELECT mlm.global_id AS global_id,
922 mlm.lir_id AS lir_id,
923 mse.worker_id AS worker_id,
924 SUM(mse.elapsed_ns) AS worker_ns
925 FROM mz_introspection.mz_lir_mapping mlm
926 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
927 ON (mlm.operator_id_start <= mse.id AND mse.id < mlm.operator_id_end)
928GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
929 ));
930 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
931
932 if let Some(worker_id) = worker_id {
933 predicates.push(format!("pwc.worker_id = {worker_id}"));
934 } else {
935 worker_id = Some("pwc.worker_id");
936 columns.push("pwc.worker_id AS worker_id");
937 order_by.push("worker_id");
938 }
939
940 columns.extend([
941 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
942 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
943 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
944 ]);
945 }
946 columns.push(
947 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
948 );
949 }
950 }
951 }
952 }
953 ExplainAnalyzeProperty::Hints => {
954 columns.extend([
955 "megsa.levels AS levels",
956 "megsa.to_cut AS to_cut",
957 "megsa.hint AS hint",
958 "pg_size_pretty(savings) AS savings",
959 ]);
960 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
961 "LEFT JOIN mz_introspection.mz_expected_group_size_advice megsa ON (megsa.dataflow_id = mdgi.id AND mlm.operator_id_start <= megsa.region_id AND megsa.region_id < mlm.operator_id_end)"]);
962 }
963 }
964
965 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
966
967 let ctes = if !ctes.is_empty() {
968 format!(
969 "WITH {}",
970 separated(
971 ",\n",
972 ctes.iter()
973 .map(|(name, defn)| format!("{name} AS ({defn})"))
974 )
975 )
976 } else {
977 String::new()
978 };
979 let columns = separated(", ", columns);
980 let from = separated(" ", from);
981 let predicates = separated(" AND ", predicates);
982 let order_by = separated(", ", order_by);
983 let query = format!(
984 r#"{ctes}
985SELECT {columns}
986FROM {from}
987WHERE {predicates}
988ORDER BY {order_by}"#
989 );
990
991 if statement.as_sql {
992 let rows = vec![Row::pack_slice(&[Datum::String(
993 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
994 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
995 })?,
996 )])];
997 let typ = RelationType::new(vec![ScalarType::String.nullable(false)]);
998
999 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1000 } else {
1001 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1002 show_select.plan()
1003 }
1004}
1005
1006pub fn plan_explain_timestamp(
1007 scx: &StatementContext,
1008 explain: ExplainTimestampStatement<Aug>,
1009) -> Result<Plan, PlanError> {
1010 let (format, _verbose_syntax) = match explain.format() {
1011 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1012 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1013 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1014 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1015 };
1016
1017 let raw_plan = {
1018 let query::PlannedRootQuery {
1019 expr: raw_plan,
1020 desc: _,
1021 finishing: _,
1022 scope: _,
1023 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1024 if raw_plan.contains_parameters()? {
1025 return Err(PlanError::ParameterNotAllowed(
1026 "EXPLAIN TIMESTAMP".to_string(),
1027 ));
1028 }
1029
1030 raw_plan
1031 };
1032 let when = query::plan_as_of(scx, explain.select.as_of)?;
1033
1034 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1035 format,
1036 raw_plan,
1037 when,
1038 }))
1039}
1040
1041#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1044pub fn plan_query(
1045 scx: &StatementContext,
1046 query: Query<Aug>,
1047 params: &Params,
1048 lifetime: QueryLifetime,
1049) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1050 let query::PlannedRootQuery {
1051 mut expr,
1052 desc,
1053 finishing,
1054 scope,
1055 } = query::plan_root_query(scx, query, lifetime)?;
1056 expr.bind_parameters(scx, lifetime, params)?;
1057
1058 Ok(query::PlannedRootQuery {
1059 expr: expr.lower(scx.catalog.system_vars(), None)?,
1061 desc,
1062 finishing,
1063 scope,
1064 })
1065}
1066
1067generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1068
1069pub fn describe_subscribe(
1070 scx: &StatementContext,
1071 stmt: SubscribeStatement<Aug>,
1072) -> Result<StatementDesc, PlanError> {
1073 let relation_desc = match stmt.relation {
1074 SubscribeRelation::Name(name) => {
1075 let item = scx.get_item_by_resolved_name(&name)?;
1076 item.desc(&scx.catalog.resolve_full_name(item.name()))?
1077 .into_owned()
1078 }
1079 SubscribeRelation::Query(query) => {
1080 let query::PlannedRootQuery { desc, .. } =
1081 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1082 desc
1083 }
1084 };
1085 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1086 let progress = progress.unwrap_or(false);
1087 let mut desc = RelationDesc::builder().with_column(
1088 "mz_timestamp",
1089 ScalarType::Numeric {
1090 max_scale: Some(NumericMaxScale::ZERO),
1091 }
1092 .nullable(false),
1093 );
1094 if progress {
1095 desc = desc.with_column("mz_progressed", ScalarType::Bool.nullable(false));
1096 }
1097
1098 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1099 match stmt.output {
1100 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1101 desc = desc.with_column("mz_diff", ScalarType::Int64.nullable(true));
1102 for (name, mut ty) in relation_desc.into_iter() {
1103 if progress {
1104 ty.nullable = true;
1105 }
1106 desc = desc.with_column(name, ty);
1107 }
1108 }
1109 SubscribeOutput::EnvelopeUpsert { key_columns }
1110 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1111 desc = desc.with_column("mz_state", ScalarType::String.nullable(true));
1112 let key_columns = key_columns
1113 .into_iter()
1114 .map(normalize::column_name)
1115 .collect_vec();
1116 let mut before_values_desc = RelationDesc::builder();
1117 let mut after_values_desc = RelationDesc::builder();
1118
1119 for column_name in &key_columns {
1121 let mut column_ty = relation_desc
1122 .get_by_name(column_name)
1123 .map(|(_pos, ty)| ty.clone())
1124 .ok_or_else(|| PlanError::UnknownColumn {
1125 table: None,
1126 column: column_name.clone(),
1127 similar: Box::new([]),
1128 })?;
1129 if progress {
1130 column_ty.nullable = true;
1131 }
1132 desc = desc.with_column(column_name, column_ty);
1133 }
1134
1135 for (mut name, mut ty) in relation_desc
1138 .into_iter()
1139 .filter(|(name, _ty)| !key_columns.contains(name))
1140 {
1141 ty.nullable = true;
1142 before_values_desc =
1143 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1144 if debezium {
1145 name = format!("after_{}", name).into();
1146 }
1147 after_values_desc = after_values_desc.with_column(name, ty);
1148 }
1149
1150 if debezium {
1151 desc = desc.concat(before_values_desc);
1152 }
1153 desc = desc.concat(after_values_desc);
1154 }
1155 }
1156 Ok(StatementDesc::new(Some(desc.finish())))
1157}
1158
1159pub fn plan_subscribe(
1160 scx: &StatementContext,
1161 SubscribeStatement {
1162 relation,
1163 options,
1164 as_of,
1165 up_to,
1166 output,
1167 }: SubscribeStatement<Aug>,
1168 params: &Params,
1169 copy_to: Option<CopyFormat>,
1170) -> Result<Plan, PlanError> {
1171 let (from, desc, scope) = match relation {
1172 SubscribeRelation::Name(name) => {
1173 let entry = scx.get_item_by_resolved_name(&name)?;
1174 let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1175 Ok(desc) => desc,
1176 Err(..) => sql_bail!(
1177 "'{}' cannot be subscribed to because it is a {}",
1178 name.full_name_str(),
1179 entry.item_type(),
1180 ),
1181 };
1182 let item_name = match name {
1183 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1184 _ => None,
1185 };
1186 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1187 (
1188 SubscribeFrom::Id(entry.global_id()),
1189 desc.into_owned(),
1190 scope,
1191 )
1192 }
1193 SubscribeRelation::Query(query) => {
1194 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1196 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1200 &query.finishing,
1201 query.desc.arity()
1202 ));
1203 let desc = query.desc.clone();
1204 (
1205 SubscribeFrom::Query {
1206 expr: query.expr,
1207 desc: query.desc,
1208 },
1209 desc,
1210 query.scope,
1211 )
1212 }
1213 };
1214
1215 let when = query::plan_as_of(scx, as_of)?;
1216 let up_to = up_to.map(|up_to| plan_up_to(scx, up_to)).transpose()?;
1217
1218 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1219 let ecx = ExprContext {
1220 qcx: &qcx,
1221 name: "",
1222 scope: &scope,
1223 relation_type: desc.typ(),
1224 allow_aggregates: false,
1225 allow_subqueries: true,
1226 allow_parameters: true,
1227 allow_windows: false,
1228 };
1229
1230 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1231 let output = match output {
1232 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1233 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1234 let order_by = key_columns
1235 .iter()
1236 .map(|ident| OrderByExpr {
1237 expr: Expr::Identifier(vec![ident.clone()]),
1238 asc: None,
1239 nulls_last: None,
1240 })
1241 .collect_vec();
1242 let (order_by, map_exprs) = query::plan_order_by_exprs(
1243 &ExprContext {
1244 name: "ENVELOPE UPSERT KEY clause",
1245 ..ecx
1246 },
1247 &order_by[..],
1248 &output_columns[..],
1249 )?;
1250 if !map_exprs.is_empty() {
1251 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1252 }
1253 plan::SubscribeOutput::EnvelopeUpsert {
1254 order_by_keys: order_by,
1255 }
1256 }
1257 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1258 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1259 let order_by = key_columns
1260 .iter()
1261 .map(|ident| OrderByExpr {
1262 expr: Expr::Identifier(vec![ident.clone()]),
1263 asc: None,
1264 nulls_last: None,
1265 })
1266 .collect_vec();
1267 let (order_by, map_exprs) = query::plan_order_by_exprs(
1268 &ExprContext {
1269 name: "ENVELOPE DEBEZIUM KEY clause",
1270 ..ecx
1271 },
1272 &order_by[..],
1273 &output_columns[..],
1274 )?;
1275 if !map_exprs.is_empty() {
1276 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1277 }
1278 plan::SubscribeOutput::EnvelopeDebezium {
1279 order_by_keys: order_by,
1280 }
1281 }
1282 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1283 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1284 let mz_diff = "mz_diff".into();
1285 let output_columns = std::iter::once((0, &mz_diff))
1286 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1287 .collect_vec();
1288 match query::plan_order_by_exprs(
1289 &ExprContext {
1290 name: "WITHIN TIMESTAMP ORDER BY clause",
1291 ..ecx
1292 },
1293 &order_by[..],
1294 &output_columns[..],
1295 ) {
1296 Err(PlanError::UnknownColumn {
1297 table: None,
1298 column,
1299 similar: _,
1300 }) if &column == &mz_diff => {
1301 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1304 }
1305 Err(e) => return Err(e),
1306 Ok((order_by, map_exprs)) => {
1307 if !map_exprs.is_empty() {
1308 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1309 }
1310
1311 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1312 }
1313 }
1314 }
1315 };
1316
1317 let SubscribeOptionExtracted {
1318 progress, snapshot, ..
1319 } = options.try_into()?;
1320 Ok(Plan::Subscribe(SubscribePlan {
1321 from,
1322 when,
1323 up_to,
1324 with_snapshot: snapshot.unwrap_or(true),
1325 copy_to,
1326 emit_progress: progress.unwrap_or(false),
1327 output,
1328 }))
1329}
1330
1331pub fn describe_copy_from_table(
1332 scx: &StatementContext,
1333 table_name: <Aug as AstInfo>::ItemName,
1334 columns: Vec<Ident>,
1335) -> Result<StatementDesc, PlanError> {
1336 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1337 Ok(StatementDesc::new(Some(desc)))
1338}
1339
1340pub fn describe_copy_item(
1341 scx: &StatementContext,
1342 object_name: <Aug as AstInfo>::ItemName,
1343 columns: Vec<Ident>,
1344) -> Result<StatementDesc, PlanError> {
1345 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1346 Ok(StatementDesc::new(Some(desc)))
1347}
1348
1349pub fn describe_copy(
1350 scx: &StatementContext,
1351 CopyStatement {
1352 relation,
1353 direction,
1354 ..
1355 }: CopyStatement<Aug>,
1356) -> Result<StatementDesc, PlanError> {
1357 Ok(match (relation, direction) {
1358 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1359 describe_copy_item(scx, name, columns)?
1360 }
1361 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1362 describe_copy_from_table(scx, name, columns)?
1363 }
1364 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1365 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1366 }
1367 .with_is_copy())
1368}
1369
1370fn plan_copy_to_expr(
1371 scx: &StatementContext,
1372 select_plan: SelectPlan,
1373 desc: RelationDesc,
1374 to: &Expr<Aug>,
1375 format: CopyFormat,
1376 options: CopyOptionExtracted,
1377) -> Result<Plan, PlanError> {
1378 let conn_id = match options.aws_connection {
1379 Some(conn_id) => CatalogItemId::from(conn_id),
1380 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1381 };
1382 let connection = scx.get_item(&conn_id).connection()?;
1383
1384 match connection {
1385 mz_storage_types::connections::Connection::Aws(_) => {}
1386 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1387 }
1388
1389 let format = match format {
1390 CopyFormat::Csv => {
1391 let quote = extract_byte_param_value(options.quote, "quote")?;
1392 let escape = extract_byte_param_value(options.escape, "escape")?;
1393 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1394 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1395 CopyCsvFormatParams::try_new(
1396 delimiter,
1397 quote,
1398 escape,
1399 options.header,
1400 options.null,
1401 )
1402 .map_err(|e| sql_err!("{}", e))?,
1403 ))
1404 }
1405 CopyFormat::Parquet => {
1406 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1408 S3SinkFormat::Parquet
1409 }
1410 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1411 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1412 };
1413
1414 let mut to_expr = to.clone();
1416 transform_ast::transform(scx, &mut to_expr)?;
1417 let relation_type = RelationDesc::empty();
1418 let ecx = &ExprContext {
1419 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1420 name: "COPY TO target",
1421 scope: &Scope::empty(),
1422 relation_type: relation_type.typ(),
1423 allow_aggregates: false,
1424 allow_subqueries: false,
1425 allow_parameters: false,
1426 allow_windows: false,
1427 };
1428
1429 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &ScalarType::String)?;
1430
1431 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1432 sql_bail!(
1433 "MAX FILE SIZE cannot be less than {}",
1434 MIN_S3_SINK_FILE_SIZE
1435 );
1436 }
1437 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1438 sql_bail!(
1439 "MAX FILE SIZE cannot be greater than {}",
1440 MAX_S3_SINK_FILE_SIZE
1441 );
1442 }
1443
1444 Ok(Plan::CopyTo(CopyToPlan {
1445 select_plan,
1446 desc,
1447 to,
1448 connection: connection.to_owned(),
1449 connection_id: conn_id,
1450 format,
1451 max_file_size: options.max_file_size.as_bytes(),
1452 }))
1453}
1454
1455fn plan_copy_from(
1456 scx: &StatementContext,
1457 target: &CopyTarget<Aug>,
1458 table_name: ResolvedItemName,
1459 columns: Vec<Ident>,
1460 format: CopyFormat,
1461 options: CopyOptionExtracted,
1462) -> Result<Plan, PlanError> {
1463 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1464 match option {
1465 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1466 None => Ok(()),
1467 }
1468 }
1469
1470 let source = match target {
1471 CopyTarget::Stdin => CopyFromSource::Stdin,
1472 CopyTarget::Expr(from) => {
1473 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1474
1475 let mut from_expr = from.clone();
1477 transform_ast::transform(scx, &mut from_expr)?;
1478 let relation_type = RelationDesc::empty();
1479 let ecx = &ExprContext {
1480 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1481 name: "COPY FROM target",
1482 scope: &Scope::empty(),
1483 relation_type: relation_type.typ(),
1484 allow_aggregates: false,
1485 allow_subqueries: false,
1486 allow_parameters: false,
1487 allow_windows: false,
1488 };
1489 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?;
1490
1491 match options.aws_connection {
1492 Some(conn_id) => {
1493 let conn_id = CatalogItemId::from(conn_id);
1494
1495 let connection = match scx.get_item(&conn_id).connection()? {
1497 mz_storage_types::connections::Connection::Aws(conn) => conn,
1498 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1499 };
1500
1501 CopyFromSource::AwsS3 {
1502 uri: from,
1503 connection,
1504 connection_id: conn_id,
1505 }
1506 }
1507 None => CopyFromSource::Url(from),
1508 }
1509 }
1510 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1511 };
1512
1513 let params = match format {
1514 CopyFormat::Text => {
1515 only_available_with_csv(options.quote, "quote")?;
1516 only_available_with_csv(options.escape, "escape")?;
1517 only_available_with_csv(options.header, "HEADER")?;
1518 let delimiter =
1519 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1520 let null = match options.null {
1521 Some(null) => Cow::from(null),
1522 None => Cow::from("\\N"),
1523 };
1524 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1525 }
1526 CopyFormat::Csv => {
1527 let quote = extract_byte_param_value(options.quote, "quote")?;
1528 let escape = extract_byte_param_value(options.escape, "escape")?;
1529 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1530 CopyFormatParams::Csv(
1531 CopyCsvFormatParams::try_new(
1532 delimiter,
1533 quote,
1534 escape,
1535 options.header,
1536 options.null,
1537 )
1538 .map_err(|e| sql_err!("{}", e))?,
1539 )
1540 }
1541 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1542 CopyFormat::Parquet => CopyFormatParams::Parquet,
1543 };
1544
1545 let filter = match (options.files, options.pattern) {
1546 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1547 (Some(files), None) => Some(CopyFromFilter::Files(files)),
1548 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1549 (None, None) => None,
1550 };
1551
1552 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1553 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1554 }
1555
1556 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1557
1558 let Some(mfp) = maybe_mfp else {
1559 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1560 };
1561
1562 Ok(Plan::CopyFrom(CopyFromPlan {
1563 id,
1564 source,
1565 columns,
1566 source_desc,
1567 mfp,
1568 params,
1569 filter,
1570 }))
1571}
1572
1573fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1574 match v {
1575 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1576 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1577 None => Ok(None),
1578 }
1579}
1580
1581generate_extracted_config!(
1582 CopyOption,
1583 (Format, String),
1584 (Delimiter, String),
1585 (Null, String),
1586 (Escape, String),
1587 (Quote, String),
1588 (Header, bool),
1589 (AwsConnection, with_options::Object),
1590 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1591 (Files, Vec<String>),
1592 (Pattern, String)
1593);
1594
1595pub fn plan_copy(
1596 scx: &StatementContext,
1597 CopyStatement {
1598 relation,
1599 direction,
1600 target,
1601 options,
1602 }: CopyStatement<Aug>,
1603) -> Result<Plan, PlanError> {
1604 let options = CopyOptionExtracted::try_from(options)?;
1605 let format = options
1608 .format
1609 .as_ref()
1610 .map(|format| match format.to_lowercase().as_str() {
1611 "text" => Ok(CopyFormat::Text),
1612 "csv" => Ok(CopyFormat::Csv),
1613 "binary" => Ok(CopyFormat::Binary),
1614 "parquet" => Ok(CopyFormat::Parquet),
1615 _ => sql_bail!("unknown FORMAT: {}", format),
1616 })
1617 .transpose()?;
1618
1619 match (&direction, &target) {
1620 (CopyDirection::To, CopyTarget::Stdout) => {
1621 if options.delimiter.is_some() {
1622 sql_bail!("COPY TO does not support DELIMITER option yet");
1623 }
1624 if options.quote.is_some() {
1625 sql_bail!("COPY TO does not support QUOTE option yet");
1626 }
1627 if options.null.is_some() {
1628 sql_bail!("COPY TO does not support NULL option yet");
1629 }
1630 match relation {
1631 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1632 CopyRelation::Select(stmt) => Ok(plan_select(
1633 scx,
1634 stmt,
1635 &Params::empty(),
1636 Some(format.unwrap_or(CopyFormat::Text)),
1637 )?),
1638 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1639 scx,
1640 stmt,
1641 &Params::empty(),
1642 Some(format.unwrap_or(CopyFormat::Text)),
1643 )?),
1644 }
1645 }
1646 (CopyDirection::From, target) => match relation {
1647 CopyRelation::Named { name, columns } => plan_copy_from(
1648 scx,
1649 target,
1650 name,
1651 columns,
1652 format.unwrap_or(CopyFormat::Text),
1653 options,
1654 ),
1655 _ => sql_bail!("COPY FROM {} not supported", target),
1656 },
1657 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1658 if !scx.catalog.active_role_id().is_system() {
1663 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1664 }
1665
1666 let format = match format {
1667 Some(inner) => inner,
1668 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1669 };
1670
1671 let stmt = match relation {
1672 CopyRelation::Named { name, columns } => {
1673 if !columns.is_empty() {
1674 sql_bail!(
1676 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1677 );
1678 }
1679 let query = Query {
1681 ctes: CteBlock::empty(),
1682 body: SetExpr::Table(name),
1683 order_by: vec![],
1684 limit: None,
1685 offset: None,
1686 };
1687 SelectStatement { query, as_of: None }
1688 }
1689 CopyRelation::Select(stmt) => {
1690 if !stmt.query.order_by.is_empty() {
1691 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1692 }
1693 stmt
1694 }
1695 _ => sql_bail!("COPY {} {} not supported", direction, target),
1696 };
1697
1698 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1699 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1700 }
1701 _ => sql_bail!("COPY {} {} not supported", direction, target),
1702 }
1703}