1use std::borrow::Cow;
16use std::collections::{BTreeMap, BTreeSet};
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::visit::Visit;
22use mz_expr::{MirRelationExpr, RowSetFinishing};
23use mz_ore::num::NonNeg;
24use mz_ore::soft_panic_or_log;
25use mz_ore::str::separated;
26use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
27use mz_repr::adt::numeric::NumericMaxScale;
28use mz_repr::bytes::ByteSize;
29use mz_repr::explain::{ExplainConfig, ExplainFormat};
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlRelationType, SqlScalarType};
32use mz_sql_parser::ast::{
33 CteBlock, ExplainAnalyzeComputationProperty, ExplainAnalyzeProperty, ExplainAnalyzeStatement,
34 ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement, ExplainSinkSchemaFor,
35 ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr, IfExistsBehavior, OrderByExpr,
36 SetExpr, SubscribeOutput, UnresolvedItemName,
37};
38use mz_sql_parser::ident;
39use mz_storage_types::sinks::{
40 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
41 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
42};
43
44use crate::ast::display::AstDisplay;
45use crate::ast::{
46 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
47 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
48 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
49 UpdateStatement,
50};
51use crate::catalog::CatalogItemType;
52use crate::names::{Aug, ResolvedItemName};
53use crate::normalize;
54use crate::plan::query::{
55 ExprContext, QueryLifetime, offset_into_value, plan_as_of_or_up_to, plan_expr,
56};
57use crate::plan::scope::Scope;
58use crate::plan::statement::show::ShowSelect;
59use crate::plan::statement::{StatementContext, StatementDesc, ddl};
60use crate::plan::{
61 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
62 ExplainTimestampPlan, HirRelationExpr, HirScalarExpr, side_effecting_func, transform_ast,
63};
64use crate::plan::{
65 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
66 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
67};
68use crate::plan::{CopyFromSource, with_options};
69use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
70
71pub fn describe_insert(
78 scx: &StatementContext,
79 InsertStatement {
80 table_name,
81 columns,
82 source,
83 returning,
84 }: InsertStatement<Aug>,
85) -> Result<StatementDesc, PlanError> {
86 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
87 let desc = if returning.expr.is_empty() {
88 None
89 } else {
90 Some(returning.desc)
91 };
92 Ok(StatementDesc::new(desc))
93}
94
95pub fn plan_insert(
96 scx: &StatementContext,
97 InsertStatement {
98 table_name,
99 columns,
100 source,
101 returning,
102 }: InsertStatement<Aug>,
103 params: &Params,
104) -> Result<Plan, PlanError> {
105 let (id, mut expr, returning) =
106 query::plan_insert_query(scx, table_name, columns, source, returning)?;
107 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
108 let returning = returning
109 .expr
110 .into_iter()
111 .map(|mut expr| {
112 expr.bind_parameters(scx, QueryLifetime::OneShot, params)?;
113 expr.lower_uncorrelated()
114 })
115 .collect::<Result<Vec<_>, _>>()?;
116
117 Ok(Plan::Insert(InsertPlan {
118 id,
119 values: expr,
120 returning,
121 }))
122}
123
124pub fn describe_delete(
125 scx: &StatementContext,
126 stmt: DeleteStatement<Aug>,
127) -> Result<StatementDesc, PlanError> {
128 query::plan_delete_query(scx, stmt)?;
129 Ok(StatementDesc::new(None))
130}
131
132pub fn plan_delete(
133 scx: &StatementContext,
134 stmt: DeleteStatement<Aug>,
135 params: &Params,
136) -> Result<Plan, PlanError> {
137 let rtw_plan = query::plan_delete_query(scx, stmt)?;
138 plan_read_then_write(scx, MutationKind::Delete, params, rtw_plan)
139}
140
141pub fn describe_update(
142 scx: &StatementContext,
143 stmt: UpdateStatement<Aug>,
144) -> Result<StatementDesc, PlanError> {
145 query::plan_update_query(scx, stmt)?;
146 Ok(StatementDesc::new(None))
147}
148
149pub fn plan_update(
150 scx: &StatementContext,
151 stmt: UpdateStatement<Aug>,
152 params: &Params,
153) -> Result<Plan, PlanError> {
154 let rtw_plan = query::plan_update_query(scx, stmt)?;
155 plan_read_then_write(scx, MutationKind::Update, params, rtw_plan)
156}
157
158pub fn plan_read_then_write(
159 scx: &StatementContext,
160 kind: MutationKind,
161 params: &Params,
162 query::ReadThenWritePlan {
163 id,
164 mut selection,
165 finishing,
166 assignments,
167 }: query::ReadThenWritePlan,
168) -> Result<Plan, PlanError> {
169 selection.bind_parameters(scx, QueryLifetime::OneShot, params)?;
170 let mut assignments_outer = BTreeMap::new();
171 for (idx, mut set) in assignments {
172 set.bind_parameters(scx, QueryLifetime::OneShot, params)?;
173 let set = set.lower_uncorrelated()?;
174 assignments_outer.insert(idx, set);
175 }
176
177 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
178 id,
179 selection,
180 finishing,
181 assignments: assignments_outer,
182 kind,
183 returning: Vec::new(),
184 }))
185}
186
187pub fn describe_select(
188 scx: &StatementContext,
189 stmt: SelectStatement<Aug>,
190) -> Result<StatementDesc, PlanError> {
191 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
192 return Ok(StatementDesc::new(Some(desc)));
193 }
194
195 let query::PlannedRootQuery { desc, .. } =
196 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
197 Ok(StatementDesc::new(Some(desc)))
198}
199
200pub fn plan_select(
201 scx: &StatementContext,
202 select: SelectStatement<Aug>,
203 params: &Params,
204 copy_to: Option<CopyFormat>,
205) -> Result<Plan, PlanError> {
206 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
207 return Ok(Plan::SideEffectingFunc(f));
208 }
209
210 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
211 Ok(Plan::Select(plan))
212}
213
214fn plan_select_inner(
215 scx: &StatementContext,
216 select: SelectStatement<Aug>,
217 params: &Params,
218 copy_to: Option<CopyFormat>,
219) -> Result<(SelectPlan, RelationDesc), PlanError> {
220 let when = query::plan_as_of(scx, select.as_of.clone())?;
221 let lifetime = QueryLifetime::OneShot;
222 let query::PlannedRootQuery {
223 mut expr,
224 desc,
225 finishing,
226 scope: _,
227 } = query::plan_root_query(scx, select.query.clone(), lifetime)?;
228 expr.bind_parameters(scx, lifetime, params)?;
229
230 expr.try_visit_mut_pre(&mut |expr| {
233 if let HirRelationExpr::TopK { offset, .. } = expr {
234 let offset_value = offset_into_value(offset.take())?;
235 *offset = HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64);
236 }
237 Ok::<(), PlanError>(())
238 })?;
239 let limit = match finishing.limit {
249 None => None,
250 Some(mut limit) => {
251 limit.bind_parameters(scx, lifetime, params)?;
252 let Some(limit) = limit.as_literal() else {
254 sql_bail!(
255 "Top-level LIMIT must be a constant expression, got {}",
256 limit
257 )
258 };
259 match limit {
260 Datum::Null => None,
261 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
262 _ => {
263 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
264 sql_bail!("LIMIT must be a non-negative INT or NULL")
265 }
266 }
267 }
268 };
269 let offset = {
270 let mut offset = finishing.offset.clone();
271 offset.bind_parameters(scx, lifetime, params)?;
272 let offset = offset_into_value(offset.take())?;
273 offset
274 .try_into()
275 .expect("checked in offset_into_value that it is not negative")
276 };
277
278 let plan = SelectPlan {
279 source: expr,
280 when,
281 finishing: RowSetFinishing {
282 limit,
283 offset,
284 project: finishing.project,
285 order_by: finishing.order_by,
286 },
287 copy_to,
288 select: Some(Box::new(select)),
289 };
290
291 Ok((plan, desc))
292}
293
294pub fn describe_explain_plan(
295 scx: &StatementContext,
296 explain: ExplainPlanStatement<Aug>,
297) -> Result<StatementDesc, PlanError> {
298 let mut relation_desc = RelationDesc::builder();
299
300 match explain.stage() {
301 ExplainStage::RawPlan => {
302 let name = "Raw Plan";
303 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
304 }
305 ExplainStage::DecorrelatedPlan => {
306 let name = "Decorrelated Plan";
307 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
308 }
309 ExplainStage::LocalPlan => {
310 let name = "Locally Optimized Plan";
311 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
312 }
313 ExplainStage::GlobalPlan => {
314 let name = "Optimized Plan";
315 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
316 }
317 ExplainStage::PhysicalPlan => {
318 let name = "Physical Plan";
319 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
320 }
321 ExplainStage::Trace => {
322 relation_desc = relation_desc
323 .with_column("Time", SqlScalarType::UInt64.nullable(false))
324 .with_column("Path", SqlScalarType::String.nullable(false))
325 .with_column("Plan", SqlScalarType::String.nullable(false));
326 }
327 ExplainStage::PlanInsights => {
328 let name = "Plan Insights";
329 relation_desc = relation_desc.with_column(name, SqlScalarType::String.nullable(false));
330 }
331 };
332 let relation_desc = relation_desc.finish();
333
334 Ok(
335 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
336 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
337 _ => vec![],
338 }),
339 )
340}
341
342pub fn describe_explain_pushdown(
343 scx: &StatementContext,
344 statement: ExplainPushdownStatement<Aug>,
345) -> Result<StatementDesc, PlanError> {
346 let relation_desc = RelationDesc::builder()
347 .with_column("Source", SqlScalarType::String.nullable(false))
348 .with_column("Total Bytes", SqlScalarType::UInt64.nullable(false))
349 .with_column("Selected Bytes", SqlScalarType::UInt64.nullable(false))
350 .with_column("Total Parts", SqlScalarType::UInt64.nullable(false))
351 .with_column("Selected Parts", SqlScalarType::UInt64.nullable(false))
352 .finish();
353
354 Ok(
355 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
356 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
357 _ => vec![],
358 }),
359 )
360}
361
362pub fn describe_explain_analyze(
363 _scx: &StatementContext,
364 statement: ExplainAnalyzeStatement<Aug>,
365) -> Result<StatementDesc, PlanError> {
366 if statement.as_sql {
367 let relation_desc = RelationDesc::builder()
368 .with_column("SQL", SqlScalarType::String.nullable(false))
369 .finish();
370 return Ok(StatementDesc::new(Some(relation_desc)));
371 }
372
373 match statement.properties {
374 ExplainAnalyzeProperty::Computation { properties, skew } => {
375 let mut relation_desc = RelationDesc::builder()
376 .with_column("operator", SqlScalarType::String.nullable(false));
377
378 if skew {
379 relation_desc =
380 relation_desc.with_column("worker_id", SqlScalarType::UInt64.nullable(true));
381 }
382
383 let mut seen_properties = BTreeSet::new();
384 for property in properties {
385 if !seen_properties.insert(property) {
387 continue;
388 }
389
390 match property {
391 ExplainAnalyzeComputationProperty::Memory if skew => {
392 let numeric = SqlScalarType::Numeric { max_scale: None }.nullable(true);
393 relation_desc = relation_desc
394 .with_column("memory_ratio", numeric.clone())
395 .with_column("worker_memory", SqlScalarType::String.nullable(true))
396 .with_column("avg_memory", SqlScalarType::String.nullable(true))
397 .with_column("total_memory", SqlScalarType::String.nullable(true))
398 .with_column("records_ratio", numeric.clone())
399 .with_column("worker_records", numeric.clone())
400 .with_column("avg_records", numeric.clone())
401 .with_column("total_records", numeric);
402 }
403 ExplainAnalyzeComputationProperty::Memory => {
404 relation_desc = relation_desc
405 .with_column("total_memory", SqlScalarType::String.nullable(true))
406 .with_column(
407 "total_records",
408 SqlScalarType::Numeric { max_scale: None }.nullable(true),
409 );
410 }
411 ExplainAnalyzeComputationProperty::Cpu => {
412 if skew {
413 relation_desc = relation_desc
414 .with_column(
415 "cpu_ratio",
416 SqlScalarType::Numeric { max_scale: None }.nullable(true),
417 )
418 .with_column(
419 "worker_elapsed",
420 SqlScalarType::Interval.nullable(true),
421 )
422 .with_column("avg_elapsed", SqlScalarType::Interval.nullable(true));
423 }
424 relation_desc = relation_desc
425 .with_column("total_elapsed", SqlScalarType::Interval.nullable(true));
426 }
427 }
428 }
429
430 let relation_desc = relation_desc.finish();
431 Ok(StatementDesc::new(Some(relation_desc)))
432 }
433 ExplainAnalyzeProperty::Hints => {
434 let relation_desc = RelationDesc::builder()
435 .with_column("operator", SqlScalarType::String.nullable(true))
436 .with_column("levels", SqlScalarType::Int64.nullable(true))
437 .with_column("to_cut", SqlScalarType::Int64.nullable(true))
438 .with_column("hint", SqlScalarType::Float64.nullable(true))
439 .with_column("savings", SqlScalarType::String.nullable(true))
440 .finish();
441 Ok(StatementDesc::new(Some(relation_desc)))
442 }
443 }
444}
445
446pub fn describe_explain_timestamp(
447 scx: &StatementContext,
448 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
449) -> Result<StatementDesc, PlanError> {
450 let relation_desc = RelationDesc::builder()
451 .with_column("Timestamp", SqlScalarType::String.nullable(false))
452 .finish();
453
454 Ok(StatementDesc::new(Some(relation_desc))
455 .with_params(describe_select(scx, select)?.param_types))
456}
457
458pub fn describe_explain_schema(
459 _: &StatementContext,
460 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
461) -> Result<StatementDesc, PlanError> {
462 let relation_desc = RelationDesc::builder()
463 .with_column("Schema", SqlScalarType::String.nullable(false))
464 .finish();
465 Ok(StatementDesc::new(Some(relation_desc)))
466}
467
468generate_extracted_config!(
477 ExplainPlanOption,
478 (Arity, Option<bool>, Default(None)),
479 (Cardinality, bool, Default(false)),
480 (ColumnNames, bool, Default(false)),
481 (FilterPushdown, Option<bool>, Default(None)),
482 (HumanizedExpressions, Option<bool>, Default(None)),
483 (JoinImplementations, bool, Default(false)),
484 (Keys, bool, Default(false)),
485 (LinearChains, bool, Default(false)),
486 (NoFastPath, bool, Default(false)),
487 (NonNegative, bool, Default(false)),
488 (NoNotices, bool, Default(false)),
489 (NodeIdentifiers, bool, Default(false)),
490 (Raw, bool, Default(false)),
491 (RawPlans, bool, Default(false)),
492 (RawSyntax, bool, Default(false)),
493 (Redacted, bool, Default(false)),
494 (SubtreeSize, bool, Default(false)),
495 (Timing, bool, Default(false)),
496 (Types, bool, Default(false)),
497 (Equivalences, bool, Default(false)),
498 (ReoptimizeImportedViews, Option<bool>, Default(None)),
499 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
500 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
501 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
502 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
503 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
504 (
505 EnableProjectionPushdownAfterRelationCse,
506 Option<bool>,
507 Default(None)
508 )
509);
510
511impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
512 type Error = PlanError;
513
514 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
515 if v.raw {
518 v.raw_plans = true;
519 v.raw_syntax = true;
520 }
521
522 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
525
526 Ok(ExplainConfig {
527 arity: v.arity.unwrap_or(enable_on_prod),
528 cardinality: v.cardinality,
529 column_names: v.column_names,
530 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
531 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
532 join_impls: v.join_implementations,
533 keys: v.keys,
534 linear_chains: !v.raw_plans && v.linear_chains,
535 no_fast_path: v.no_fast_path,
536 no_notices: v.no_notices,
537 node_ids: v.node_identifiers,
538 non_negative: v.non_negative,
539 raw_plans: v.raw_plans,
540 raw_syntax: v.raw_syntax,
541 verbose_syntax: false,
542 redacted: v.redacted,
543 subtree_size: v.subtree_size,
544 equivalences: v.equivalences,
545 timing: v.timing,
546 types: v.types,
547 features: OptimizerFeatureOverrides {
549 enable_guard_subquery_tablefunc: Default::default(),
550 enable_eager_delta_joins: v.enable_eager_delta_joins,
551 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
552 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
553 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
554 enable_consolidate_after_union_negate: Default::default(),
555 enable_reduce_mfp_fusion: Default::default(),
556 enable_cardinality_estimates: Default::default(),
557 persist_fast_path_limit: Default::default(),
558 reoptimize_imported_views: v.reoptimize_imported_views,
559 enable_reduce_reduction: Default::default(),
560 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
561 enable_projection_pushdown_after_relation_cse: v
562 .enable_projection_pushdown_after_relation_cse,
563 enable_less_reduce_in_eqprop: Default::default(),
564 enable_dequadratic_eqprop_map: Default::default(),
565 enable_eq_classes_withholding_errors: Default::default(),
566 enable_fast_path_plan_insights: Default::default(),
567 },
568 })
569 }
570}
571
572fn plan_explainee(
573 scx: &StatementContext,
574 explainee: Explainee<Aug>,
575 params: &Params,
576) -> Result<plan::Explainee, PlanError> {
577 use crate::plan::ExplaineeStatement;
578
579 let is_replan = matches!(
580 explainee,
581 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
582 );
583
584 let explainee = match explainee {
585 Explainee::View(name) | Explainee::ReplanView(name) => {
586 let item = scx.get_item_by_resolved_name(&name)?;
587 let item_type = item.item_type();
588 if item_type != CatalogItemType::View {
589 sql_bail!("Expected {name} to be a view, not a {item_type}");
590 }
591 match is_replan {
592 true => crate::plan::Explainee::ReplanView(item.id()),
593 false => crate::plan::Explainee::View(item.id()),
594 }
595 }
596 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
597 let item = scx.get_item_by_resolved_name(&name)?;
598 let item_type = item.item_type();
599 if item_type != CatalogItemType::MaterializedView {
600 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
601 }
602 match is_replan {
603 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
604 false => crate::plan::Explainee::MaterializedView(item.id()),
605 }
606 }
607 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
608 let item = scx.get_item_by_resolved_name(&name)?;
609 let item_type = item.item_type();
610 if item_type != CatalogItemType::Index {
611 sql_bail!("Expected {name} to be an index, not a {item_type}");
612 }
613 match is_replan {
614 true => crate::plan::Explainee::ReplanIndex(item.id()),
615 false => crate::plan::Explainee::Index(item.id()),
616 }
617 }
618 Explainee::Select(select, broken) => {
619 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
620 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
621 }
622 Explainee::CreateView(mut stmt, broken) => {
623 if stmt.if_exists != IfExistsBehavior::Skip {
624 stmt.if_exists = IfExistsBehavior::Skip;
629 } else {
630 sql_bail!(
631 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
632 (the behavior is implied within the scope of an enclosing EXPLAIN)"
633 );
634 }
635
636 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt)? else {
637 sql_bail!("expected CreateViewPlan plan");
638 };
639
640 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
641 }
642 Explainee::CreateMaterializedView(mut stmt, broken) => {
643 if stmt.if_exists != IfExistsBehavior::Skip {
644 stmt.if_exists = IfExistsBehavior::Skip;
649 } else {
650 sql_bail!(
651 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
652 (the behavior is implied within the scope of an enclosing EXPLAIN)"
653 );
654 }
655
656 let Plan::CreateMaterializedView(plan) =
657 ddl::plan_create_materialized_view(scx, *stmt)?
658 else {
659 sql_bail!("expected CreateMaterializedViewPlan plan");
660 };
661
662 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
663 broken,
664 plan,
665 })
666 }
667 Explainee::CreateIndex(mut stmt, broken) => {
668 if !stmt.if_not_exists {
669 stmt.if_not_exists = true;
672 } else {
673 sql_bail!(
674 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
675 (the behavior is implied within the scope of an enclosing EXPLAIN)"
676 );
677 }
678
679 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
680 sql_bail!("expected CreateIndexPlan plan");
681 };
682
683 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
684 }
685 };
686
687 Ok(explainee)
688}
689
690pub fn plan_explain_plan(
691 scx: &StatementContext,
692 explain: ExplainPlanStatement<Aug>,
693 params: &Params,
694) -> Result<Plan, PlanError> {
695 let (format, verbose_syntax) = match explain.format() {
696 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
697 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
698 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
699 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
700 };
701 let stage = explain.stage();
702
703 let mut config = {
705 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
706
707 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
708 with_options.filter_pushdown = Some(false);
710 }
711
712 ExplainConfig::try_from(with_options)?
713 };
714 config.verbose_syntax = verbose_syntax;
715
716 let explainee = plan_explainee(scx, explain.explainee, params)?;
717
718 Ok(Plan::ExplainPlan(ExplainPlanPlan {
719 stage,
720 format,
721 config,
722 explainee,
723 }))
724}
725
726pub fn plan_explain_schema(
727 scx: &StatementContext,
728 explain_schema: ExplainSinkSchemaStatement<Aug>,
729) -> Result<Plan, PlanError> {
730 let ExplainSinkSchemaStatement {
731 schema_for,
732 format: _,
734 mut statement,
735 } = explain_schema;
736
737 statement.name = Some(UnresolvedItemName::qualified(&[
741 ident!("mz_catalog"),
742 ident!("mz_explain_schema"),
743 ]));
744
745 crate::pure::purify_create_sink_avro_doc_on_options(
746 scx.catalog,
747 *statement.from.item_id(),
748 &mut statement.format,
749 )?;
750
751 match ddl::plan_create_sink(scx, statement)? {
752 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
753 StorageSinkConnection::Kafka(KafkaSinkConnection {
754 format:
755 KafkaSinkFormat {
756 key_format,
757 value_format:
758 KafkaSinkFormatType::Avro {
759 schema: value_schema,
760 ..
761 },
762 ..
763 },
764 ..
765 }) => {
766 let schema = match schema_for {
767 ExplainSinkSchemaFor::Key => key_format
768 .and_then(|f| match f {
769 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
770 _ => None,
771 })
772 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
773 ExplainSinkSchemaFor::Value => value_schema,
774 };
775
776 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
777 sink_from: sink.from,
778 json_schema: schema,
779 }))
780 }
781 _ => bail_unsupported!(
782 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
783 ),
784 },
785 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
786 }
787}
788
789pub fn plan_explain_pushdown(
790 scx: &StatementContext,
791 statement: ExplainPushdownStatement<Aug>,
792 params: &Params,
793) -> Result<Plan, PlanError> {
794 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
795 let explainee = plan_explainee(scx, statement.explainee, params)?;
796 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
797}
798
799pub fn plan_explain_analyze(
800 scx: &StatementContext,
801 statement: ExplainAnalyzeStatement<Aug>,
802 params: &Params,
803) -> Result<Plan, PlanError> {
804 let explainee_name = statement
805 .explainee
806 .name()
807 .ok_or_else(|| sql_err!("EXPLAIN ANALYZE on anonymous dataflows",))?
808 .full_name_str();
809 let explainee = plan_explainee(scx, statement.explainee, params)?;
810
811 match explainee {
812 plan::Explainee::Index(_index_id) => (),
813 plan::Explainee::MaterializedView(_item_id) => (),
814 _ => {
815 return Err(sql_err!("EXPLAIN ANALYZE queries for this explainee type",));
816 }
817 };
818
819 let mut ctes = Vec::with_capacity(4); let mut columns = vec!["REPEAT(' ', nesting * 2) || operator AS operator"];
834 let mut from = vec!["mz_introspection.mz_lir_mapping mlm"];
835 let mut predicates = vec![format!("mo.name = '{}'", explainee_name)];
836 let mut order_by = vec!["mlm.lir_id DESC"];
837
838 match statement.properties {
839 ExplainAnalyzeProperty::Computation { properties, skew } => {
840 let mut worker_id = None;
841 let mut seen_properties = BTreeSet::new();
842 for property in properties {
843 if !seen_properties.insert(property) {
845 continue;
846 }
847
848 match property {
849 ExplainAnalyzeComputationProperty::Memory => {
850 ctes.push((
851 "summary_memory",
852 r#"
853 SELECT mlm.global_id AS global_id,
854 mlm.lir_id AS lir_id,
855 SUM(mas.size) AS total_memory,
856 SUM(mas.records) AS total_records,
857 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.size) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_memory,
858 CASE WHEN COUNT(DISTINCT mas.worker_id) <> 0 THEN SUM(mas.records) / COUNT(DISTINCT mas.worker_id) ELSE NULL END AS avg_records
859 FROM mz_introspection.mz_lir_mapping mlm
860 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
861 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
862 ON (mas.operator_id = valid_id)
863GROUP BY mlm.global_id, mlm.lir_id"#,
864 ));
865 from.push("LEFT JOIN summary_memory sm USING (global_id, lir_id)");
866
867 if skew {
868 ctes.push((
869 "per_worker_memory",
870 r#"
871 SELECT mlm.global_id AS global_id,
872 mlm.lir_id AS lir_id,
873 mas.worker_id AS worker_id,
874 SUM(mas.size) AS worker_memory,
875 SUM(mas.records) AS worker_records
876 FROM mz_introspection.mz_lir_mapping mlm
877 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
878 JOIN mz_introspection.mz_arrangement_sizes_per_worker mas
879 ON (mas.operator_id = valid_id)
880GROUP BY mlm.global_id, mlm.lir_id, mas.worker_id"#,
881 ));
882 from.push("LEFT JOIN per_worker_memory pwm USING (global_id, lir_id)");
883
884 if let Some(worker_id) = worker_id {
885 predicates.push(format!("pwm.worker_id = {worker_id}"));
886 } else {
887 worker_id = Some("pwm.worker_id");
888 columns.push("pwm.worker_id AS worker_id");
889 order_by.push("worker_id");
890 }
891
892 columns.extend([
893 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_memory <> 0 THEN ROUND(pwm.worker_memory / sm.avg_memory, 2) ELSE NULL END AS memory_ratio",
894 "pg_size_pretty(pwm.worker_memory) AS worker_memory",
895 "pg_size_pretty(sm.avg_memory) AS avg_memory",
896 "pg_size_pretty(sm.total_memory) AS total_memory",
897 "CASE WHEN pwm.worker_id IS NOT NULL AND sm.avg_records <> 0 THEN ROUND(pwm.worker_records / sm.avg_records, 2) ELSE NULL END AS records_ratio",
898 "pwm.worker_records AS worker_records",
899 "sm.avg_records AS avg_records",
900 "sm.total_records AS total_records",
901 ]);
902 } else {
903 columns.extend([
904 "pg_size_pretty(sm.total_memory) AS total_memory",
905 "sm.total_records AS total_records",
906 ]);
907 }
908 }
909 ExplainAnalyzeComputationProperty::Cpu => {
910 ctes.push((
911 "summary_cpu",
912 r#"
913 SELECT mlm.global_id AS global_id,
914 mlm.lir_id AS lir_id,
915 SUM(mse.elapsed_ns) AS total_ns,
916 CASE WHEN COUNT(DISTINCT mse.worker_id) <> 0 THEN SUM(mse.elapsed_ns) / COUNT(DISTINCT mse.worker_id) ELSE NULL END AS avg_ns
917 FROM mz_introspection.mz_lir_mapping mlm
918 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
919 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
920 ON (mse.id = valid_id)
921GROUP BY mlm.global_id, mlm.lir_id"#,
922 ));
923 from.push("LEFT JOIN summary_cpu sc USING (global_id, lir_id)");
924
925 if skew {
926 ctes.push((
927 "per_worker_cpu",
928 r#"
929 SELECT mlm.global_id AS global_id,
930 mlm.lir_id AS lir_id,
931 mse.worker_id AS worker_id,
932 SUM(mse.elapsed_ns) AS worker_ns
933 FROM mz_introspection.mz_lir_mapping mlm
934 CROSS JOIN generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id
935 JOIN mz_introspection.mz_scheduling_elapsed_per_worker mse
936 ON (mse.id = valid_id)
937GROUP BY mlm.global_id, mlm.lir_id, mse.worker_id"#,
938 ));
939 from.push("LEFT JOIN per_worker_cpu pwc USING (global_id, lir_id)");
940
941 if let Some(worker_id) = worker_id {
942 predicates.push(format!("pwc.worker_id = {worker_id}"));
943 } else {
944 worker_id = Some("pwc.worker_id");
945 columns.push("pwc.worker_id AS worker_id");
946 order_by.push("worker_id");
947 }
948
949 columns.extend([
950 "CASE WHEN pwc.worker_id IS NOT NULL AND sc.avg_ns <> 0 THEN ROUND(pwc.worker_ns / sc.avg_ns, 2) ELSE NULL END AS cpu_ratio",
951 "pwc.worker_ns / 1000 * '1 microsecond'::INTERVAL AS worker_elapsed",
952 "sc.avg_ns / 1000 * '1 microsecond'::INTERVAL AS avg_elapsed",
953 ]);
954 }
955 columns.push(
956 "sc.total_ns / 1000 * '1 microsecond'::INTERVAL AS total_elapsed",
957 );
958 }
959 }
960 }
961 }
962 ExplainAnalyzeProperty::Hints => {
963 columns.extend([
964 "megsa.levels AS levels",
965 "megsa.to_cut AS to_cut",
966 "megsa.hint AS hint",
967 "pg_size_pretty(megsa.savings) AS savings",
968 ]);
969 from.extend(["JOIN mz_introspection.mz_dataflow_global_ids mdgi ON (mlm.global_id = mdgi.global_id)",
970 "LEFT JOIN (generate_series((mlm.operator_id_start) :: int8, (mlm.operator_id_end - 1) :: int8) AS valid_id JOIN \
971 mz_introspection.mz_expected_group_size_advice megsa ON (megsa.region_id = valid_id)) ON (megsa.dataflow_id = mdgi.id)"]);
972 }
973 }
974
975 from.push("JOIN mz_introspection.mz_mappable_objects mo ON (mlm.global_id = mo.global_id)");
976
977 let ctes = if !ctes.is_empty() {
978 format!(
979 "WITH {}",
980 separated(
981 ",\n",
982 ctes.iter()
983 .map(|(name, defn)| format!("{name} AS ({defn})"))
984 )
985 )
986 } else {
987 String::new()
988 };
989 let columns = separated(", ", columns);
990 let from = separated(" ", from);
991 let predicates = separated(" AND ", predicates);
992 let order_by = separated(", ", order_by);
993 let query = format!(
994 r#"{ctes}
995SELECT {columns}
996FROM {from}
997WHERE {predicates}
998ORDER BY {order_by}"#
999 );
1000
1001 if statement.as_sql {
1002 let rows = vec![Row::pack_slice(&[Datum::String(
1003 &mz_sql_pretty::pretty_str_simple(&query, 80).map_err(|e| {
1004 PlanError::Unstructured(format!("internal error parsing our own SQL: {e}"))
1005 })?,
1006 )])];
1007 let typ = SqlRelationType::new(vec![SqlScalarType::String.nullable(false)]);
1008
1009 Ok(Plan::Select(SelectPlan::immediate(rows, typ)))
1010 } else {
1011 let (show_select, _resolved_ids) = ShowSelect::new_from_bare_query(scx, query)?;
1012 show_select.plan()
1013 }
1014}
1015
1016pub fn plan_explain_timestamp(
1017 scx: &StatementContext,
1018 explain: ExplainTimestampStatement<Aug>,
1019) -> Result<Plan, PlanError> {
1020 let (format, _verbose_syntax) = match explain.format() {
1021 mz_sql_parser::ast::ExplainFormat::Text => (ExplainFormat::Text, false),
1022 mz_sql_parser::ast::ExplainFormat::VerboseText => (ExplainFormat::Text, true),
1023 mz_sql_parser::ast::ExplainFormat::Json => (ExplainFormat::Json, false),
1024 mz_sql_parser::ast::ExplainFormat::Dot => (ExplainFormat::Dot, false),
1025 };
1026
1027 let raw_plan = {
1028 let query::PlannedRootQuery {
1029 expr: raw_plan,
1030 desc: _,
1031 finishing: _,
1032 scope: _,
1033 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
1034 if raw_plan.contains_parameters()? {
1035 return Err(PlanError::ParameterNotAllowed(
1036 "EXPLAIN TIMESTAMP".to_string(),
1037 ));
1038 }
1039
1040 raw_plan
1041 };
1042 let when = query::plan_as_of(scx, explain.select.as_of)?;
1043
1044 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
1045 format,
1046 raw_plan,
1047 when,
1048 }))
1049}
1050
1051#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
1054pub fn plan_query(
1055 scx: &StatementContext,
1056 query: Query<Aug>,
1057 params: &Params,
1058 lifetime: QueryLifetime,
1059) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
1060 let query::PlannedRootQuery {
1061 mut expr,
1062 desc,
1063 finishing,
1064 scope,
1065 } = query::plan_root_query(scx, query, lifetime)?;
1066 expr.bind_parameters(scx, lifetime, params)?;
1067
1068 Ok(query::PlannedRootQuery {
1069 expr: expr.lower(scx.catalog.system_vars(), None)?,
1071 desc,
1072 finishing,
1073 scope,
1074 })
1075}
1076
1077generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
1078
1079pub fn describe_subscribe(
1080 scx: &StatementContext,
1081 stmt: SubscribeStatement<Aug>,
1082) -> Result<StatementDesc, PlanError> {
1083 let relation_desc = match stmt.relation {
1084 SubscribeRelation::Name(name) => {
1085 let item = scx.get_item_by_resolved_name(&name)?;
1086 item.desc(&scx.catalog.resolve_full_name(item.name()))?
1087 .into_owned()
1088 }
1089 SubscribeRelation::Query(query) => {
1090 let query::PlannedRootQuery { desc, .. } =
1091 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
1092 desc
1093 }
1094 };
1095 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
1096 let progress = progress.unwrap_or(false);
1097 let mut desc = RelationDesc::builder().with_column(
1098 "mz_timestamp",
1099 SqlScalarType::Numeric {
1100 max_scale: Some(NumericMaxScale::ZERO),
1101 }
1102 .nullable(false),
1103 );
1104 if progress {
1105 desc = desc.with_column("mz_progressed", SqlScalarType::Bool.nullable(false));
1106 }
1107
1108 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
1109 match stmt.output {
1110 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
1111 desc = desc.with_column("mz_diff", SqlScalarType::Int64.nullable(true));
1112 for (name, mut ty) in relation_desc.into_iter() {
1113 if progress {
1114 ty.nullable = true;
1115 }
1116 desc = desc.with_column(name, ty);
1117 }
1118 }
1119 SubscribeOutput::EnvelopeUpsert { key_columns }
1120 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
1121 desc = desc.with_column("mz_state", SqlScalarType::String.nullable(true));
1122 let key_columns = key_columns
1123 .into_iter()
1124 .map(normalize::column_name)
1125 .collect_vec();
1126 let mut before_values_desc = RelationDesc::builder();
1127 let mut after_values_desc = RelationDesc::builder();
1128
1129 for column_name in &key_columns {
1131 let mut column_ty = relation_desc
1132 .get_by_name(column_name)
1133 .map(|(_pos, ty)| ty.clone())
1134 .ok_or_else(|| PlanError::UnknownColumn {
1135 table: None,
1136 column: column_name.clone(),
1137 similar: Box::new([]),
1138 })?;
1139 if progress {
1140 column_ty.nullable = true;
1141 }
1142 desc = desc.with_column(column_name, column_ty);
1143 }
1144
1145 for (mut name, mut ty) in relation_desc
1148 .into_iter()
1149 .filter(|(name, _ty)| !key_columns.contains(name))
1150 {
1151 ty.nullable = true;
1152 before_values_desc =
1153 before_values_desc.with_column(format!("before_{}", name), ty.clone());
1154 if debezium {
1155 name = format!("after_{}", name).into();
1156 }
1157 after_values_desc = after_values_desc.with_column(name, ty);
1158 }
1159
1160 if debezium {
1161 desc = desc.concat(before_values_desc);
1162 }
1163 desc = desc.concat(after_values_desc);
1164 }
1165 }
1166 Ok(StatementDesc::new(Some(desc.finish())))
1167}
1168
1169pub fn plan_subscribe(
1170 scx: &StatementContext,
1171 SubscribeStatement {
1172 relation,
1173 options,
1174 as_of,
1175 up_to,
1176 output,
1177 }: SubscribeStatement<Aug>,
1178 params: &Params,
1179 copy_to: Option<CopyFormat>,
1180) -> Result<Plan, PlanError> {
1181 let (from, desc, scope) = match relation {
1182 SubscribeRelation::Name(name) => {
1183 let entry = scx.get_item_by_resolved_name(&name)?;
1184 let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
1185 Ok(desc) => desc,
1186 Err(..) => sql_bail!(
1187 "'{}' cannot be subscribed to because it is a {}",
1188 name.full_name_str(),
1189 entry.item_type(),
1190 ),
1191 };
1192 let item_name = match name {
1193 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
1194 _ => None,
1195 };
1196 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
1197 (
1198 SubscribeFrom::Id(entry.global_id()),
1199 desc.into_owned(),
1200 scope,
1201 )
1202 }
1203 SubscribeRelation::Query(query) => {
1204 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
1206 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
1210 &query.finishing,
1211 query.desc.arity()
1212 ));
1213 let desc = query.desc.clone();
1214 (
1215 SubscribeFrom::Query {
1216 expr: query.expr,
1217 desc: query.desc,
1218 },
1219 desc,
1220 query.scope,
1221 )
1222 }
1223 };
1224
1225 let when = query::plan_as_of(scx, as_of)?;
1226 let up_to = up_to
1227 .map(|up_to| plan_as_of_or_up_to(scx, up_to))
1228 .transpose()?;
1229
1230 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
1231 let ecx = ExprContext {
1232 qcx: &qcx,
1233 name: "",
1234 scope: &scope,
1235 relation_type: desc.typ(),
1236 allow_aggregates: false,
1237 allow_subqueries: true,
1238 allow_parameters: true,
1239 allow_windows: false,
1240 };
1241
1242 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1243 let output = match output {
1244 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
1245 SubscribeOutput::EnvelopeUpsert { key_columns } => {
1246 let order_by = key_columns
1247 .iter()
1248 .map(|ident| OrderByExpr {
1249 expr: Expr::Identifier(vec![ident.clone()]),
1250 asc: None,
1251 nulls_last: None,
1252 })
1253 .collect_vec();
1254 let (order_by, map_exprs) = query::plan_order_by_exprs(
1255 &ExprContext {
1256 name: "ENVELOPE UPSERT KEY clause",
1257 ..ecx
1258 },
1259 &order_by[..],
1260 &output_columns[..],
1261 )?;
1262 if !map_exprs.is_empty() {
1263 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
1264 }
1265 plan::SubscribeOutput::EnvelopeUpsert {
1266 order_by_keys: order_by,
1267 }
1268 }
1269 SubscribeOutput::EnvelopeDebezium { key_columns } => {
1270 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
1271 let order_by = key_columns
1272 .iter()
1273 .map(|ident| OrderByExpr {
1274 expr: Expr::Identifier(vec![ident.clone()]),
1275 asc: None,
1276 nulls_last: None,
1277 })
1278 .collect_vec();
1279 let (order_by, map_exprs) = query::plan_order_by_exprs(
1280 &ExprContext {
1281 name: "ENVELOPE DEBEZIUM KEY clause",
1282 ..ecx
1283 },
1284 &order_by[..],
1285 &output_columns[..],
1286 )?;
1287 if !map_exprs.is_empty() {
1288 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
1289 }
1290 plan::SubscribeOutput::EnvelopeDebezium {
1291 order_by_keys: order_by,
1292 }
1293 }
1294 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
1295 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
1296 let mz_diff = "mz_diff".into();
1297 let output_columns = std::iter::once((0, &mz_diff))
1298 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
1299 .collect_vec();
1300 match query::plan_order_by_exprs(
1301 &ExprContext {
1302 name: "WITHIN TIMESTAMP ORDER BY clause",
1303 ..ecx
1304 },
1305 &order_by[..],
1306 &output_columns[..],
1307 ) {
1308 Err(PlanError::UnknownColumn {
1309 table: None,
1310 column,
1311 similar: _,
1312 }) if &column == &mz_diff => {
1313 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1316 }
1317 Err(e) => return Err(e),
1318 Ok((order_by, map_exprs)) => {
1319 if !map_exprs.is_empty() {
1320 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
1321 }
1322
1323 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
1324 }
1325 }
1326 }
1327 };
1328
1329 let SubscribeOptionExtracted {
1330 progress, snapshot, ..
1331 } = options.try_into()?;
1332 Ok(Plan::Subscribe(SubscribePlan {
1333 from,
1334 when,
1335 up_to,
1336 with_snapshot: snapshot.unwrap_or(true),
1337 copy_to,
1338 emit_progress: progress.unwrap_or(false),
1339 output,
1340 }))
1341}
1342
1343pub fn describe_copy_from_table(
1344 scx: &StatementContext,
1345 table_name: <Aug as AstInfo>::ItemName,
1346 columns: Vec<Ident>,
1347) -> Result<StatementDesc, PlanError> {
1348 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
1349 Ok(StatementDesc::new(Some(desc)))
1350}
1351
1352pub fn describe_copy_item(
1353 scx: &StatementContext,
1354 object_name: <Aug as AstInfo>::ItemName,
1355 columns: Vec<Ident>,
1356) -> Result<StatementDesc, PlanError> {
1357 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1358 Ok(StatementDesc::new(Some(desc)))
1359}
1360
1361pub fn describe_copy(
1362 scx: &StatementContext,
1363 CopyStatement {
1364 relation,
1365 direction,
1366 ..
1367 }: CopyStatement<Aug>,
1368) -> Result<StatementDesc, PlanError> {
1369 Ok(match (relation, direction) {
1370 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1371 describe_copy_item(scx, name, columns)?
1372 }
1373 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1374 describe_copy_from_table(scx, name, columns)?
1375 }
1376 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1377 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1378 }
1379 .with_is_copy())
1380}
1381
1382fn plan_copy_to_expr(
1383 scx: &StatementContext,
1384 select_plan: SelectPlan,
1385 desc: RelationDesc,
1386 to: &Expr<Aug>,
1387 format: CopyFormat,
1388 options: CopyOptionExtracted,
1389) -> Result<Plan, PlanError> {
1390 let conn_id = match options.aws_connection {
1391 Some(conn_id) => CatalogItemId::from(conn_id),
1392 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1393 };
1394 let connection = scx.get_item(&conn_id).connection()?;
1395
1396 match connection {
1397 mz_storage_types::connections::Connection::Aws(_) => {}
1398 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1399 }
1400
1401 let format = match format {
1402 CopyFormat::Csv => {
1403 let quote = extract_byte_param_value(options.quote, "quote")?;
1404 let escape = extract_byte_param_value(options.escape, "escape")?;
1405 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1406 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1407 CopyCsvFormatParams::try_new(
1408 delimiter,
1409 quote,
1410 escape,
1411 options.header,
1412 options.null,
1413 )
1414 .map_err(|e| sql_err!("{}", e))?,
1415 ))
1416 }
1417 CopyFormat::Parquet => {
1418 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1420 S3SinkFormat::Parquet
1421 }
1422 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1423 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1424 };
1425
1426 let mut to_expr = to.clone();
1428 transform_ast::transform(scx, &mut to_expr)?;
1429 let relation_type = RelationDesc::empty();
1430 let ecx = &ExprContext {
1431 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1432 name: "COPY TO target",
1433 scope: &Scope::empty(),
1434 relation_type: relation_type.typ(),
1435 allow_aggregates: false,
1436 allow_subqueries: false,
1437 allow_parameters: false,
1438 allow_windows: false,
1439 };
1440
1441 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &SqlScalarType::String)?;
1442
1443 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1444 sql_bail!(
1445 "MAX FILE SIZE cannot be less than {}",
1446 MIN_S3_SINK_FILE_SIZE
1447 );
1448 }
1449 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1450 sql_bail!(
1451 "MAX FILE SIZE cannot be greater than {}",
1452 MAX_S3_SINK_FILE_SIZE
1453 );
1454 }
1455
1456 Ok(Plan::CopyTo(CopyToPlan {
1457 select_plan,
1458 desc,
1459 to,
1460 connection: connection.to_owned(),
1461 connection_id: conn_id,
1462 format,
1463 max_file_size: options.max_file_size.as_bytes(),
1464 }))
1465}
1466
1467fn plan_copy_from(
1468 scx: &StatementContext,
1469 target: &CopyTarget<Aug>,
1470 table_name: ResolvedItemName,
1471 columns: Vec<Ident>,
1472 format: CopyFormat,
1473 options: CopyOptionExtracted,
1474) -> Result<Plan, PlanError> {
1475 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1476 match option {
1477 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1478 None => Ok(()),
1479 }
1480 }
1481
1482 let source = match target {
1483 CopyTarget::Stdin => CopyFromSource::Stdin,
1484 CopyTarget::Expr(from) => {
1485 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1486
1487 let mut from_expr = from.clone();
1489 transform_ast::transform(scx, &mut from_expr)?;
1490 let relation_type = RelationDesc::empty();
1491 let ecx = &ExprContext {
1492 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1493 name: "COPY FROM target",
1494 scope: &Scope::empty(),
1495 relation_type: relation_type.typ(),
1496 allow_aggregates: false,
1497 allow_subqueries: false,
1498 allow_parameters: false,
1499 allow_windows: false,
1500 };
1501 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &SqlScalarType::String)?;
1502
1503 match options.aws_connection {
1504 Some(conn_id) => {
1505 let conn_id = CatalogItemId::from(conn_id);
1506
1507 let connection = match scx.get_item(&conn_id).connection()? {
1509 mz_storage_types::connections::Connection::Aws(conn) => conn,
1510 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1511 };
1512
1513 CopyFromSource::AwsS3 {
1514 uri: from,
1515 connection,
1516 connection_id: conn_id,
1517 }
1518 }
1519 None => CopyFromSource::Url(from),
1520 }
1521 }
1522 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1523 };
1524
1525 let params = match format {
1526 CopyFormat::Text => {
1527 only_available_with_csv(options.quote, "quote")?;
1528 only_available_with_csv(options.escape, "escape")?;
1529 only_available_with_csv(options.header, "HEADER")?;
1530 let delimiter =
1531 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1532 let null = match options.null {
1533 Some(null) => Cow::from(null),
1534 None => Cow::from("\\N"),
1535 };
1536 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1537 }
1538 CopyFormat::Csv => {
1539 let quote = extract_byte_param_value(options.quote, "quote")?;
1540 let escape = extract_byte_param_value(options.escape, "escape")?;
1541 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1542 CopyFormatParams::Csv(
1543 CopyCsvFormatParams::try_new(
1544 delimiter,
1545 quote,
1546 escape,
1547 options.header,
1548 options.null,
1549 )
1550 .map_err(|e| sql_err!("{}", e))?,
1551 )
1552 }
1553 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1554 CopyFormat::Parquet => CopyFormatParams::Parquet,
1555 };
1556
1557 let filter = match (options.files, options.pattern) {
1558 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1559 (Some(files), None) => Some(CopyFromFilter::Files(files)),
1560 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1561 (None, None) => None,
1562 };
1563
1564 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1565 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1566 }
1567
1568 let table_name_string = table_name.full_name_str();
1569
1570 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1571
1572 let Some(mfp) = maybe_mfp else {
1573 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1574 };
1575
1576 Ok(Plan::CopyFrom(CopyFromPlan {
1577 target_id: id,
1578 target_name: table_name_string,
1579 source,
1580 columns,
1581 source_desc,
1582 mfp,
1583 params,
1584 filter,
1585 }))
1586}
1587
1588fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1589 match v {
1590 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1591 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1592 None => Ok(None),
1593 }
1594}
1595
1596generate_extracted_config!(
1597 CopyOption,
1598 (Format, String),
1599 (Delimiter, String),
1600 (Null, String),
1601 (Escape, String),
1602 (Quote, String),
1603 (Header, bool),
1604 (AwsConnection, with_options::Object),
1605 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1606 (Files, Vec<String>),
1607 (Pattern, String)
1608);
1609
1610pub fn plan_copy(
1611 scx: &StatementContext,
1612 CopyStatement {
1613 relation,
1614 direction,
1615 target,
1616 options,
1617 }: CopyStatement<Aug>,
1618) -> Result<Plan, PlanError> {
1619 let options = CopyOptionExtracted::try_from(options)?;
1620 let format = options
1623 .format
1624 .as_ref()
1625 .map(|format| match format.to_lowercase().as_str() {
1626 "text" => Ok(CopyFormat::Text),
1627 "csv" => Ok(CopyFormat::Csv),
1628 "binary" => Ok(CopyFormat::Binary),
1629 "parquet" => Ok(CopyFormat::Parquet),
1630 _ => sql_bail!("unknown FORMAT: {}", format),
1631 })
1632 .transpose()?;
1633
1634 match (&direction, &target) {
1635 (CopyDirection::To, CopyTarget::Stdout) => {
1636 if options.delimiter.is_some() {
1637 sql_bail!("COPY TO does not support DELIMITER option yet");
1638 }
1639 if options.quote.is_some() {
1640 sql_bail!("COPY TO does not support QUOTE option yet");
1641 }
1642 if options.null.is_some() {
1643 sql_bail!("COPY TO does not support NULL option yet");
1644 }
1645 match relation {
1646 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1647 CopyRelation::Select(stmt) => Ok(plan_select(
1648 scx,
1649 stmt,
1650 &Params::empty(),
1651 Some(format.unwrap_or(CopyFormat::Text)),
1652 )?),
1653 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1654 scx,
1655 stmt,
1656 &Params::empty(),
1657 Some(format.unwrap_or(CopyFormat::Text)),
1658 )?),
1659 }
1660 }
1661 (CopyDirection::From, target) => match relation {
1662 CopyRelation::Named { name, columns } => plan_copy_from(
1663 scx,
1664 target,
1665 name,
1666 columns,
1667 format.unwrap_or(CopyFormat::Text),
1668 options,
1669 ),
1670 _ => sql_bail!("COPY FROM {} not supported", target),
1671 },
1672 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1673 if !scx.catalog.active_role_id().is_system() {
1678 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1679 }
1680
1681 let format = match format {
1682 Some(inner) => inner,
1683 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1684 };
1685
1686 let stmt = match relation {
1687 CopyRelation::Named { name, columns } => {
1688 if !columns.is_empty() {
1689 sql_bail!(
1691 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1692 );
1693 }
1694 let query = Query {
1696 ctes: CteBlock::empty(),
1697 body: SetExpr::Table(name),
1698 order_by: vec![],
1699 limit: None,
1700 offset: None,
1701 };
1702 SelectStatement { query, as_of: None }
1703 }
1704 CopyRelation::Select(stmt) => {
1705 if !stmt.query.order_by.is_empty() {
1706 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1707 }
1708 stmt
1709 }
1710 _ => sql_bail!("COPY {} {} not supported", direction, target),
1711 };
1712
1713 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1714 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1715 }
1716 _ => sql_bail!("COPY {} {} not supported", direction, target),
1717 }
1718}