1use std::borrow::Cow;
16use std::collections::BTreeMap;
17
18use itertools::Itertools;
19
20use mz_arrow_util::builder::ArrowBuilder;
21use mz_expr::{MirRelationExpr, RowSetFinishing};
22use mz_ore::num::NonNeg;
23use mz_ore::soft_panic_or_log;
24use mz_pgcopy::{CopyCsvFormatParams, CopyFormatParams, CopyTextFormatParams};
25use mz_repr::adt::numeric::NumericMaxScale;
26use mz_repr::bytes::ByteSize;
27use mz_repr::explain::{ExplainConfig, ExplainFormat};
28use mz_repr::optimize::OptimizerFeatureOverrides;
29use mz_repr::{CatalogItemId, Datum, RelationDesc, ScalarType};
30use mz_sql_parser::ast::{
31 CteBlock, ExplainPlanOption, ExplainPlanOptionName, ExplainPushdownStatement,
32 ExplainSinkSchemaFor, ExplainSinkSchemaStatement, ExplainTimestampStatement, Expr,
33 IfExistsBehavior, OrderByExpr, SetExpr, SubscribeOutput, UnresolvedItemName,
34};
35use mz_sql_parser::ident;
36use mz_storage_types::sinks::{
37 KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, MAX_S3_SINK_FILE_SIZE,
38 MIN_S3_SINK_FILE_SIZE, S3SinkFormat, StorageSinkConnection,
39};
40
41use crate::ast::display::AstDisplay;
42use crate::ast::{
43 AstInfo, CopyDirection, CopyOption, CopyOptionName, CopyRelation, CopyStatement, CopyTarget,
44 DeleteStatement, ExplainPlanStatement, ExplainStage, Explainee, Ident, InsertStatement, Query,
45 SelectStatement, SubscribeOption, SubscribeOptionName, SubscribeRelation, SubscribeStatement,
46 UpdateStatement,
47};
48use crate::catalog::CatalogItemType;
49use crate::names::{Aug, ResolvedItemName};
50use crate::normalize;
51use crate::plan::query::{ExprContext, QueryLifetime, plan_expr, plan_up_to};
52use crate::plan::scope::Scope;
53use crate::plan::statement::{StatementContext, StatementDesc, ddl};
54use crate::plan::{
55 self, CopyFromFilter, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan,
56 ExplainTimestampPlan, side_effecting_func, transform_ast,
57};
58use crate::plan::{
59 CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError,
60 QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, query,
61};
62use crate::plan::{CopyFromSource, with_options};
63use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE};
64
65pub fn describe_insert(
72 scx: &StatementContext,
73 InsertStatement {
74 table_name,
75 columns,
76 source,
77 returning,
78 }: InsertStatement<Aug>,
79) -> Result<StatementDesc, PlanError> {
80 let (_, _, returning) = query::plan_insert_query(scx, table_name, columns, source, returning)?;
81 let desc = if returning.expr.is_empty() {
82 None
83 } else {
84 Some(returning.desc)
85 };
86 Ok(StatementDesc::new(desc))
87}
88
89pub fn plan_insert(
90 scx: &StatementContext,
91 InsertStatement {
92 table_name,
93 columns,
94 source,
95 returning,
96 }: InsertStatement<Aug>,
97 params: &Params,
98) -> Result<Plan, PlanError> {
99 let (id, mut expr, returning) =
100 query::plan_insert_query(scx, table_name, columns, source, returning)?;
101 expr.bind_parameters(params)?;
102 let returning = returning
103 .expr
104 .into_iter()
105 .map(|expr| expr.lower_uncorrelated())
106 .collect::<Result<Vec<_>, _>>()?;
107
108 Ok(Plan::Insert(InsertPlan {
109 id,
110 values: expr,
111 returning,
112 }))
113}
114
115pub fn describe_delete(
116 scx: &StatementContext,
117 stmt: DeleteStatement<Aug>,
118) -> Result<StatementDesc, PlanError> {
119 query::plan_delete_query(scx, stmt)?;
120 Ok(StatementDesc::new(None))
121}
122
123pub fn plan_delete(
124 scx: &StatementContext,
125 stmt: DeleteStatement<Aug>,
126 params: &Params,
127) -> Result<Plan, PlanError> {
128 let rtw_plan = query::plan_delete_query(scx, stmt)?;
129 plan_read_then_write(MutationKind::Delete, params, rtw_plan)
130}
131
132pub fn describe_update(
133 scx: &StatementContext,
134 stmt: UpdateStatement<Aug>,
135) -> Result<StatementDesc, PlanError> {
136 query::plan_update_query(scx, stmt)?;
137 Ok(StatementDesc::new(None))
138}
139
140pub fn plan_update(
141 scx: &StatementContext,
142 stmt: UpdateStatement<Aug>,
143 params: &Params,
144) -> Result<Plan, PlanError> {
145 let rtw_plan = query::plan_update_query(scx, stmt)?;
146 plan_read_then_write(MutationKind::Update, params, rtw_plan)
147}
148
149pub fn plan_read_then_write(
150 kind: MutationKind,
151 params: &Params,
152 query::ReadThenWritePlan {
153 id,
154 mut selection,
155 finishing,
156 assignments,
157 }: query::ReadThenWritePlan,
158) -> Result<Plan, PlanError> {
159 selection.bind_parameters(params)?;
160 let mut assignments_outer = BTreeMap::new();
161 for (idx, mut set) in assignments {
162 set.bind_parameters(params)?;
163 let set = set.lower_uncorrelated()?;
164 assignments_outer.insert(idx, set);
165 }
166
167 Ok(Plan::ReadThenWrite(ReadThenWritePlan {
168 id,
169 selection,
170 finishing,
171 assignments: assignments_outer,
172 kind,
173 returning: Vec::new(),
174 }))
175}
176
177pub fn describe_select(
178 scx: &StatementContext,
179 stmt: SelectStatement<Aug>,
180) -> Result<StatementDesc, PlanError> {
181 if let Some(desc) = side_effecting_func::describe_select_if_side_effecting(scx, &stmt)? {
182 return Ok(StatementDesc::new(Some(desc)));
183 }
184
185 let query::PlannedRootQuery { desc, .. } =
186 query::plan_root_query(scx, stmt.query, QueryLifetime::OneShot)?;
187 Ok(StatementDesc::new(Some(desc)))
188}
189
190pub fn plan_select(
191 scx: &StatementContext,
192 select: SelectStatement<Aug>,
193 params: &Params,
194 copy_to: Option<CopyFormat>,
195) -> Result<Plan, PlanError> {
196 if let Some(f) = side_effecting_func::plan_select_if_side_effecting(scx, &select, params)? {
197 return Ok(Plan::SideEffectingFunc(f));
198 }
199
200 let (plan, _desc) = plan_select_inner(scx, select, params, copy_to)?;
201 Ok(Plan::Select(plan))
202}
203
204fn plan_select_inner(
205 scx: &StatementContext,
206 select: SelectStatement<Aug>,
207 params: &Params,
208 copy_to: Option<CopyFormat>,
209) -> Result<(SelectPlan, RelationDesc), PlanError> {
210 let when = query::plan_as_of(scx, select.as_of.clone())?;
211 let query::PlannedRootQuery {
212 mut expr,
213 desc,
214 finishing,
215 scope: _,
216 } = query::plan_root_query(scx, select.query.clone(), QueryLifetime::OneShot)?;
217 expr.bind_parameters(params)?;
218
219 let limit = match finishing.limit {
221 None => None,
222 Some(mut limit) => {
223 limit.bind_parameters(params)?;
224 let Some(limit) = limit.as_literal() else {
225 sql_bail!("Top-level LIMIT must be a constant expression")
226 };
227 match limit {
228 Datum::Null => None,
229 Datum::Int64(v) if v >= 0 => NonNeg::<i64>::try_from(v).ok(),
230 _ => {
231 soft_panic_or_log!("Valid literal limit must be asserted in `plan_select`");
232 sql_bail!("LIMIT must be a non negative INT or NULL")
233 }
234 }
235 }
236 };
237
238 let plan = SelectPlan {
239 source: expr,
240 when,
241 finishing: RowSetFinishing {
242 limit,
243 offset: finishing.offset,
244 project: finishing.project,
245 order_by: finishing.order_by,
246 },
247 copy_to,
248 select: Some(Box::new(select)),
249 };
250
251 Ok((plan, desc))
252}
253
254pub fn describe_explain_plan(
255 scx: &StatementContext,
256 explain: ExplainPlanStatement<Aug>,
257) -> Result<StatementDesc, PlanError> {
258 let mut relation_desc = RelationDesc::builder();
259
260 match explain.stage() {
261 ExplainStage::RawPlan => {
262 let name = "Raw Plan";
263 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
264 }
265 ExplainStage::DecorrelatedPlan => {
266 let name = "Decorrelated Plan";
267 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
268 }
269 ExplainStage::LocalPlan => {
270 let name = "Locally Optimized Plan";
271 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
272 }
273 ExplainStage::GlobalPlan => {
274 let name = "Optimized Plan";
275 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
276 }
277 ExplainStage::PhysicalPlan => {
278 let name = "Physical Plan";
279 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
280 }
281 ExplainStage::Trace => {
282 relation_desc = relation_desc
283 .with_column("Time", ScalarType::UInt64.nullable(false))
284 .with_column("Path", ScalarType::String.nullable(false))
285 .with_column("Plan", ScalarType::String.nullable(false));
286 }
287 ExplainStage::PlanInsights => {
288 let name = "Plan Insights";
289 relation_desc = relation_desc.with_column(name, ScalarType::String.nullable(false));
290 }
291 };
292 let relation_desc = relation_desc.finish();
293
294 Ok(
295 StatementDesc::new(Some(relation_desc)).with_params(match explain.explainee {
296 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
297 _ => vec![],
298 }),
299 )
300}
301
302pub fn describe_explain_pushdown(
303 scx: &StatementContext,
304 statement: ExplainPushdownStatement<Aug>,
305) -> Result<StatementDesc, PlanError> {
306 let relation_desc = RelationDesc::builder()
307 .with_column("Source", ScalarType::String.nullable(false))
308 .with_column("Total Bytes", ScalarType::UInt64.nullable(false))
309 .with_column("Selected Bytes", ScalarType::UInt64.nullable(false))
310 .with_column("Total Parts", ScalarType::UInt64.nullable(false))
311 .with_column("Selected Parts", ScalarType::UInt64.nullable(false))
312 .finish();
313
314 Ok(
315 StatementDesc::new(Some(relation_desc)).with_params(match statement.explainee {
316 Explainee::Select(select, _) => describe_select(scx, *select)?.param_types,
317 _ => vec![],
318 }),
319 )
320}
321
322pub fn describe_explain_timestamp(
323 scx: &StatementContext,
324 ExplainTimestampStatement { select, .. }: ExplainTimestampStatement<Aug>,
325) -> Result<StatementDesc, PlanError> {
326 let relation_desc = RelationDesc::builder()
327 .with_column("Timestamp", ScalarType::String.nullable(false))
328 .finish();
329
330 Ok(StatementDesc::new(Some(relation_desc))
331 .with_params(describe_select(scx, select)?.param_types))
332}
333
334pub fn describe_explain_schema(
335 _: &StatementContext,
336 ExplainSinkSchemaStatement { .. }: ExplainSinkSchemaStatement<Aug>,
337) -> Result<StatementDesc, PlanError> {
338 let relation_desc = RelationDesc::builder()
339 .with_column("Schema", ScalarType::String.nullable(false))
340 .finish();
341 Ok(StatementDesc::new(Some(relation_desc)))
342}
343
344generate_extracted_config!(
353 ExplainPlanOption,
354 (Arity, Option<bool>, Default(None)),
355 (Cardinality, bool, Default(false)),
356 (ColumnNames, bool, Default(false)),
357 (FilterPushdown, Option<bool>, Default(None)),
358 (HumanizedExpressions, Option<bool>, Default(None)),
359 (JoinImplementations, bool, Default(false)),
360 (Keys, bool, Default(false)),
361 (LinearChains, bool, Default(false)),
362 (NoFastPath, bool, Default(false)),
363 (NonNegative, bool, Default(false)),
364 (NoNotices, bool, Default(false)),
365 (NodeIdentifiers, bool, Default(false)),
366 (Raw, bool, Default(false)),
367 (RawPlans, bool, Default(false)),
368 (RawSyntax, bool, Default(false)),
369 (Redacted, bool, Default(false)),
370 (SubtreeSize, bool, Default(false)),
371 (Timing, bool, Default(false)),
372 (Types, bool, Default(false)),
373 (Equivalences, bool, Default(false)),
374 (ReoptimizeImportedViews, Option<bool>, Default(None)),
375 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
376 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
377 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
378 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
379 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
380 (
381 EnableProjectionPushdownAfterRelationCse,
382 Option<bool>,
383 Default(None)
384 )
385);
386
387impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
388 type Error = PlanError;
389
390 fn try_from(mut v: ExplainPlanOptionExtracted) -> Result<Self, Self::Error> {
391 if v.raw {
394 v.raw_plans = true;
395 v.raw_syntax = true;
396 }
397
398 let enable_on_prod = !mz_ore::assert::soft_assertions_enabled();
401
402 Ok(ExplainConfig {
403 arity: v.arity.unwrap_or(enable_on_prod),
404 cardinality: v.cardinality,
405 column_names: v.column_names,
406 filter_pushdown: v.filter_pushdown.unwrap_or(enable_on_prod),
407 humanized_exprs: !v.raw_plans && (v.humanized_expressions.unwrap_or(enable_on_prod)),
408 join_impls: v.join_implementations,
409 keys: v.keys,
410 linear_chains: !v.raw_plans && v.linear_chains,
411 no_fast_path: v.no_fast_path,
412 no_notices: v.no_notices,
413 node_ids: v.node_identifiers,
414 non_negative: v.non_negative,
415 raw_plans: v.raw_plans,
416 raw_syntax: v.raw_syntax,
417 redacted: v.redacted,
418 subtree_size: v.subtree_size,
419 equivalences: v.equivalences,
420 timing: v.timing,
421 types: v.types,
422 features: OptimizerFeatureOverrides {
424 enable_eager_delta_joins: v.enable_eager_delta_joins,
425 enable_new_outer_join_lowering: v.enable_new_outer_join_lowering,
426 enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering,
427 enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis,
428 enable_consolidate_after_union_negate: Default::default(),
429 enable_reduce_mfp_fusion: Default::default(),
430 enable_cardinality_estimates: Default::default(),
431 persist_fast_path_limit: Default::default(),
432 reoptimize_imported_views: v.reoptimize_imported_views,
433 enable_reduce_reduction: Default::default(),
434 enable_join_prioritize_arranged: v.enable_join_prioritize_arranged,
435 enable_projection_pushdown_after_relation_cse: v
436 .enable_projection_pushdown_after_relation_cse,
437 enable_less_reduce_in_eqprop: Default::default(),
438 enable_dequadratic_eqprop_map: Default::default(),
439 },
440 })
441 }
442}
443
444fn plan_explainee(
445 scx: &StatementContext,
446 explainee: Explainee<Aug>,
447 params: &Params,
448) -> Result<plan::Explainee, PlanError> {
449 use crate::plan::ExplaineeStatement;
450
451 let is_replan = matches!(
452 explainee,
453 Explainee::ReplanView(_) | Explainee::ReplanMaterializedView(_) | Explainee::ReplanIndex(_)
454 );
455
456 let explainee = match explainee {
457 Explainee::View(name) | Explainee::ReplanView(name) => {
458 let item = scx.get_item_by_resolved_name(&name)?;
459 let item_type = item.item_type();
460 if item_type != CatalogItemType::View {
461 sql_bail!("Expected {name} to be a view, not a {item_type}");
462 }
463 match is_replan {
464 true => crate::plan::Explainee::ReplanView(item.id()),
465 false => crate::plan::Explainee::View(item.id()),
466 }
467 }
468 Explainee::MaterializedView(name) | Explainee::ReplanMaterializedView(name) => {
469 let item = scx.get_item_by_resolved_name(&name)?;
470 let item_type = item.item_type();
471 if item_type != CatalogItemType::MaterializedView {
472 sql_bail!("Expected {name} to be a materialized view, not a {item_type}");
473 }
474 match is_replan {
475 true => crate::plan::Explainee::ReplanMaterializedView(item.id()),
476 false => crate::plan::Explainee::MaterializedView(item.id()),
477 }
478 }
479 Explainee::Index(name) | Explainee::ReplanIndex(name) => {
480 let item = scx.get_item_by_resolved_name(&name)?;
481 let item_type = item.item_type();
482 if item_type != CatalogItemType::Index {
483 sql_bail!("Expected {name} to be an index, not a {item_type}");
484 }
485 match is_replan {
486 true => crate::plan::Explainee::ReplanIndex(item.id()),
487 false => crate::plan::Explainee::Index(item.id()),
488 }
489 }
490 Explainee::Select(select, broken) => {
491 let (plan, desc) = plan_select_inner(scx, *select, params, None)?;
492 crate::plan::Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc })
493 }
494 Explainee::CreateView(mut stmt, broken) => {
495 if stmt.if_exists != IfExistsBehavior::Skip {
496 stmt.if_exists = IfExistsBehavior::Skip;
501 } else {
502 sql_bail!(
503 "Cannot EXPLAIN a CREATE VIEW that explictly sets IF NOT EXISTS \
504 (the behavior is implied within the scope of an enclosing EXPLAIN)"
505 );
506 }
507
508 let Plan::CreateView(plan) = ddl::plan_create_view(scx, *stmt, params)? else {
509 sql_bail!("expected CreateViewPlan plan");
510 };
511
512 crate::plan::Explainee::Statement(ExplaineeStatement::CreateView { broken, plan })
513 }
514 Explainee::CreateMaterializedView(mut stmt, broken) => {
515 if stmt.if_exists != IfExistsBehavior::Skip {
516 stmt.if_exists = IfExistsBehavior::Skip;
521 } else {
522 sql_bail!(
523 "Cannot EXPLAIN a CREATE MATERIALIZED VIEW that explictly sets IF NOT EXISTS \
524 (the behavior is implied within the scope of an enclosing EXPLAIN)"
525 );
526 }
527
528 let Plan::CreateMaterializedView(plan) =
529 ddl::plan_create_materialized_view(scx, *stmt, params)?
530 else {
531 sql_bail!("expected CreateMaterializedViewPlan plan");
532 };
533
534 crate::plan::Explainee::Statement(ExplaineeStatement::CreateMaterializedView {
535 broken,
536 plan,
537 })
538 }
539 Explainee::CreateIndex(mut stmt, broken) => {
540 if !stmt.if_not_exists {
541 stmt.if_not_exists = true;
544 } else {
545 sql_bail!(
546 "Cannot EXPLAIN a CREATE INDEX that explictly sets IF NOT EXISTS \
547 (the behavior is implied within the scope of an enclosing EXPLAIN)"
548 );
549 }
550
551 let Plan::CreateIndex(plan) = ddl::plan_create_index(scx, *stmt)? else {
552 sql_bail!("expected CreateIndexPlan plan");
553 };
554
555 crate::plan::Explainee::Statement(ExplaineeStatement::CreateIndex { broken, plan })
556 }
557 };
558
559 Ok(explainee)
560}
561
562pub fn plan_explain_plan(
563 scx: &StatementContext,
564 explain: ExplainPlanStatement<Aug>,
565 params: &Params,
566) -> Result<Plan, PlanError> {
567 let format = match explain.format() {
568 mz_sql_parser::ast::ExplainFormat::Text => ExplainFormat::Text,
569 mz_sql_parser::ast::ExplainFormat::VerboseText => ExplainFormat::VerboseText,
570 mz_sql_parser::ast::ExplainFormat::Json => ExplainFormat::Json,
571 mz_sql_parser::ast::ExplainFormat::Dot => ExplainFormat::Dot,
572 };
573 let stage = explain.stage();
574
575 let config = {
577 let mut with_options = ExplainPlanOptionExtracted::try_from(explain.with_options)?;
578
579 if !scx.catalog.system_vars().persist_stats_filter_enabled() {
580 with_options.filter_pushdown = Some(false);
582 }
583
584 ExplainConfig::try_from(with_options)?
585 };
586
587 let explainee = plan_explainee(scx, explain.explainee, params)?;
588
589 Ok(Plan::ExplainPlan(ExplainPlanPlan {
590 stage,
591 format,
592 config,
593 explainee,
594 }))
595}
596
597pub fn plan_explain_schema(
598 scx: &StatementContext,
599 explain_schema: ExplainSinkSchemaStatement<Aug>,
600) -> Result<Plan, PlanError> {
601 let ExplainSinkSchemaStatement {
602 schema_for,
603 format: _,
605 mut statement,
606 } = explain_schema;
607
608 statement.name = Some(UnresolvedItemName::qualified(&[
612 ident!("mz_catalog"),
613 ident!("mz_explain_schema"),
614 ]));
615
616 crate::pure::purify_create_sink_avro_doc_on_options(
617 scx.catalog,
618 *statement.from.item_id(),
619 &mut statement.format,
620 )?;
621
622 match ddl::plan_create_sink(scx, statement)? {
623 Plan::CreateSink(CreateSinkPlan { sink, .. }) => match sink.connection {
624 StorageSinkConnection::Kafka(KafkaSinkConnection {
625 format:
626 KafkaSinkFormat {
627 key_format,
628 value_format:
629 KafkaSinkFormatType::Avro {
630 schema: value_schema,
631 ..
632 },
633 ..
634 },
635 ..
636 }) => {
637 let schema = match schema_for {
638 ExplainSinkSchemaFor::Key => key_format
639 .and_then(|f| match f {
640 KafkaSinkFormatType::Avro { schema, .. } => Some(schema),
641 _ => None,
642 })
643 .ok_or_else(|| sql_err!("CREATE SINK does not have a key"))?,
644 ExplainSinkSchemaFor::Value => value_schema,
645 };
646
647 Ok(Plan::ExplainSinkSchema(ExplainSinkSchemaPlan {
648 sink_from: sink.from,
649 json_schema: schema,
650 }))
651 }
652 _ => bail_unsupported!(
653 "EXPLAIN SCHEMA is only available for Kafka sinks with Avro schemas"
654 ),
655 },
656 _ => unreachable!("plan_create_sink returns a CreateSinkPlan"),
657 }
658}
659
660pub fn plan_explain_pushdown(
661 scx: &StatementContext,
662 statement: ExplainPushdownStatement<Aug>,
663 params: &Params,
664) -> Result<Plan, PlanError> {
665 scx.require_feature_flag(&vars::ENABLE_EXPLAIN_PUSHDOWN)?;
666 let explainee = plan_explainee(scx, statement.explainee, params)?;
667 Ok(Plan::ExplainPushdown(ExplainPushdownPlan { explainee }))
668}
669
670pub fn plan_explain_timestamp(
671 scx: &StatementContext,
672 explain: ExplainTimestampStatement<Aug>,
673 params: &Params,
674) -> Result<Plan, PlanError> {
675 let format = match explain.format() {
676 mz_sql_parser::ast::ExplainFormat::Text => ExplainFormat::Text,
677 mz_sql_parser::ast::ExplainFormat::VerboseText => ExplainFormat::VerboseText,
678 mz_sql_parser::ast::ExplainFormat::Json => ExplainFormat::Json,
679 mz_sql_parser::ast::ExplainFormat::Dot => ExplainFormat::Dot,
680 };
681
682 let raw_plan = {
683 let query::PlannedRootQuery {
684 expr: mut raw_plan,
685 desc: _,
686 finishing: _,
687 scope: _,
688 } = query::plan_root_query(scx, explain.select.query, QueryLifetime::OneShot)?;
689 raw_plan.bind_parameters(params)?;
690
691 raw_plan
692 };
693 let when = query::plan_as_of(scx, explain.select.as_of)?;
694
695 Ok(Plan::ExplainTimestamp(ExplainTimestampPlan {
696 format,
697 raw_plan,
698 when,
699 }))
700}
701
702#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."]
705pub fn plan_query(
706 scx: &StatementContext,
707 query: Query<Aug>,
708 params: &Params,
709 lifetime: QueryLifetime,
710) -> Result<query::PlannedRootQuery<MirRelationExpr>, PlanError> {
711 let query::PlannedRootQuery {
712 mut expr,
713 desc,
714 finishing,
715 scope,
716 } = query::plan_root_query(scx, query, lifetime)?;
717 expr.bind_parameters(params)?;
718
719 Ok(query::PlannedRootQuery {
720 expr: expr.lower(scx.catalog.system_vars(), None)?,
722 desc,
723 finishing,
724 scope,
725 })
726}
727
728generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool));
729
730pub fn describe_subscribe(
731 scx: &StatementContext,
732 stmt: SubscribeStatement<Aug>,
733) -> Result<StatementDesc, PlanError> {
734 let relation_desc = match stmt.relation {
735 SubscribeRelation::Name(name) => {
736 let item = scx.get_item_by_resolved_name(&name)?;
737 item.desc(&scx.catalog.resolve_full_name(item.name()))?
738 .into_owned()
739 }
740 SubscribeRelation::Query(query) => {
741 let query::PlannedRootQuery { desc, .. } =
742 query::plan_root_query(scx, query, QueryLifetime::Subscribe)?;
743 desc
744 }
745 };
746 let SubscribeOptionExtracted { progress, .. } = stmt.options.try_into()?;
747 let progress = progress.unwrap_or(false);
748 let mut desc = RelationDesc::builder().with_column(
749 "mz_timestamp",
750 ScalarType::Numeric {
751 max_scale: Some(NumericMaxScale::ZERO),
752 }
753 .nullable(false),
754 );
755 if progress {
756 desc = desc.with_column("mz_progressed", ScalarType::Bool.nullable(false));
757 }
758
759 let debezium = matches!(stmt.output, SubscribeOutput::EnvelopeDebezium { .. });
760 match stmt.output {
761 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
762 desc = desc.with_column("mz_diff", ScalarType::Int64.nullable(true));
763 for (name, mut ty) in relation_desc.into_iter() {
764 if progress {
765 ty.nullable = true;
766 }
767 desc = desc.with_column(name, ty);
768 }
769 }
770 SubscribeOutput::EnvelopeUpsert { key_columns }
771 | SubscribeOutput::EnvelopeDebezium { key_columns } => {
772 desc = desc.with_column("mz_state", ScalarType::String.nullable(true));
773 let key_columns = key_columns
774 .into_iter()
775 .map(normalize::column_name)
776 .collect_vec();
777 let mut before_values_desc = RelationDesc::builder();
778 let mut after_values_desc = RelationDesc::builder();
779
780 for column_name in &key_columns {
782 let mut column_ty = relation_desc
783 .get_by_name(column_name)
784 .map(|(_pos, ty)| ty.clone())
785 .ok_or_else(|| PlanError::UnknownColumn {
786 table: None,
787 column: column_name.clone(),
788 similar: Box::new([]),
789 })?;
790 if progress {
791 column_ty.nullable = true;
792 }
793 desc = desc.with_column(column_name, column_ty);
794 }
795
796 for (mut name, mut ty) in relation_desc
799 .into_iter()
800 .filter(|(name, _ty)| !key_columns.contains(name))
801 {
802 ty.nullable = true;
803 before_values_desc =
804 before_values_desc.with_column(format!("before_{}", name), ty.clone());
805 if debezium {
806 name = format!("after_{}", name).into();
807 }
808 after_values_desc = after_values_desc.with_column(name, ty);
809 }
810
811 if debezium {
812 desc = desc.concat(before_values_desc);
813 }
814 desc = desc.concat(after_values_desc);
815 }
816 }
817 Ok(StatementDesc::new(Some(desc.finish())))
818}
819
820pub fn plan_subscribe(
821 scx: &StatementContext,
822 SubscribeStatement {
823 relation,
824 options,
825 as_of,
826 up_to,
827 output,
828 }: SubscribeStatement<Aug>,
829 params: &Params,
830 copy_to: Option<CopyFormat>,
831) -> Result<Plan, PlanError> {
832 let (from, desc, scope) = match relation {
833 SubscribeRelation::Name(name) => {
834 let entry = scx.get_item_by_resolved_name(&name)?;
835 let desc = match entry.desc(&scx.catalog.resolve_full_name(entry.name())) {
836 Ok(desc) => desc,
837 Err(..) => sql_bail!(
838 "'{}' cannot be subscribed to because it is a {}",
839 name.full_name_str(),
840 entry.item_type(),
841 ),
842 };
843 let item_name = match name {
844 ResolvedItemName::Item { full_name, .. } => Some(full_name.into()),
845 _ => None,
846 };
847 let scope = Scope::from_source(item_name, desc.iter().map(|(name, _type)| name));
848 (
849 SubscribeFrom::Id(entry.global_id()),
850 desc.into_owned(),
851 scope,
852 )
853 }
854 SubscribeRelation::Query(query) => {
855 #[allow(deprecated)] let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?;
857 assert!(query.finishing.is_trivial(query.desc.arity()));
861 let desc = query.desc.clone();
862 (
863 SubscribeFrom::Query {
864 expr: query.expr,
865 desc: query.desc,
866 },
867 desc,
868 query.scope,
869 )
870 }
871 };
872
873 let when = query::plan_as_of(scx, as_of)?;
874 let up_to = up_to.map(|up_to| plan_up_to(scx, up_to)).transpose()?;
875
876 let qcx = QueryContext::root(scx, QueryLifetime::Subscribe);
877 let ecx = ExprContext {
878 qcx: &qcx,
879 name: "",
880 scope: &scope,
881 relation_type: desc.typ(),
882 allow_aggregates: false,
883 allow_subqueries: true,
884 allow_parameters: true,
885 allow_windows: false,
886 };
887
888 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
889 let output = match output {
890 SubscribeOutput::Diffs => plan::SubscribeOutput::Diffs,
891 SubscribeOutput::EnvelopeUpsert { key_columns } => {
892 let order_by = key_columns
893 .iter()
894 .map(|ident| OrderByExpr {
895 expr: Expr::Identifier(vec![ident.clone()]),
896 asc: None,
897 nulls_last: None,
898 })
899 .collect_vec();
900 let (order_by, map_exprs) = query::plan_order_by_exprs(
901 &ExprContext {
902 name: "ENVELOPE UPSERT KEY clause",
903 ..ecx
904 },
905 &order_by[..],
906 &output_columns[..],
907 )?;
908 if !map_exprs.is_empty() {
909 return Err(PlanError::InvalidKeysInSubscribeEnvelopeUpsert);
910 }
911 plan::SubscribeOutput::EnvelopeUpsert {
912 order_by_keys: order_by,
913 }
914 }
915 SubscribeOutput::EnvelopeDebezium { key_columns } => {
916 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_DEBEZIUM_IN_SUBSCRIBE)?;
917 let order_by = key_columns
918 .iter()
919 .map(|ident| OrderByExpr {
920 expr: Expr::Identifier(vec![ident.clone()]),
921 asc: None,
922 nulls_last: None,
923 })
924 .collect_vec();
925 let (order_by, map_exprs) = query::plan_order_by_exprs(
926 &ExprContext {
927 name: "ENVELOPE DEBEZIUM KEY clause",
928 ..ecx
929 },
930 &order_by[..],
931 &output_columns[..],
932 )?;
933 if !map_exprs.is_empty() {
934 return Err(PlanError::InvalidKeysInSubscribeEnvelopeDebezium);
935 }
936 plan::SubscribeOutput::EnvelopeDebezium {
937 order_by_keys: order_by,
938 }
939 }
940 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
941 scx.require_feature_flag(&vars::ENABLE_WITHIN_TIMESTAMP_ORDER_BY_IN_SUBSCRIBE)?;
942 let mz_diff = "mz_diff".into();
943 let output_columns = std::iter::once((0, &mz_diff))
944 .chain(output_columns.into_iter().map(|(i, c)| (i + 1, c)))
945 .collect_vec();
946 match query::plan_order_by_exprs(
947 &ExprContext {
948 name: "WITHIN TIMESTAMP ORDER BY clause",
949 ..ecx
950 },
951 &order_by[..],
952 &output_columns[..],
953 ) {
954 Err(PlanError::UnknownColumn {
955 table: None,
956 column,
957 similar: _,
958 }) if &column == &mz_diff => {
959 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
962 }
963 Err(e) => return Err(e),
964 Ok((order_by, map_exprs)) => {
965 if !map_exprs.is_empty() {
966 return Err(PlanError::InvalidOrderByInSubscribeWithinTimestampOrderBy);
967 }
968
969 plan::SubscribeOutput::WithinTimestampOrderBy { order_by }
970 }
971 }
972 }
973 };
974
975 let SubscribeOptionExtracted {
976 progress, snapshot, ..
977 } = options.try_into()?;
978 Ok(Plan::Subscribe(SubscribePlan {
979 from,
980 when,
981 up_to,
982 with_snapshot: snapshot.unwrap_or(true),
983 copy_to,
984 emit_progress: progress.unwrap_or(false),
985 output,
986 }))
987}
988
989pub fn describe_copy_from_table(
990 scx: &StatementContext,
991 table_name: <Aug as AstInfo>::ItemName,
992 columns: Vec<Ident>,
993) -> Result<StatementDesc, PlanError> {
994 let (_, desc, _, _) = query::plan_copy_from(scx, table_name, columns)?;
995 Ok(StatementDesc::new(Some(desc)))
996}
997
998pub fn describe_copy_item(
999 scx: &StatementContext,
1000 object_name: <Aug as AstInfo>::ItemName,
1001 columns: Vec<Ident>,
1002) -> Result<StatementDesc, PlanError> {
1003 let (_, desc, _, _) = query::plan_copy_item(scx, object_name, columns)?;
1004 Ok(StatementDesc::new(Some(desc)))
1005}
1006
1007pub fn describe_copy(
1008 scx: &StatementContext,
1009 CopyStatement {
1010 relation,
1011 direction,
1012 ..
1013 }: CopyStatement<Aug>,
1014) -> Result<StatementDesc, PlanError> {
1015 Ok(match (relation, direction) {
1016 (CopyRelation::Named { name, columns }, CopyDirection::To) => {
1017 describe_copy_item(scx, name, columns)?
1018 }
1019 (CopyRelation::Named { name, columns }, CopyDirection::From) => {
1020 describe_copy_from_table(scx, name, columns)?
1021 }
1022 (CopyRelation::Select(stmt), _) => describe_select(scx, stmt)?,
1023 (CopyRelation::Subscribe(stmt), _) => describe_subscribe(scx, stmt)?,
1024 }
1025 .with_is_copy())
1026}
1027
1028fn plan_copy_to_expr(
1029 scx: &StatementContext,
1030 select_plan: SelectPlan,
1031 desc: RelationDesc,
1032 to: &Expr<Aug>,
1033 format: CopyFormat,
1034 options: CopyOptionExtracted,
1035) -> Result<Plan, PlanError> {
1036 let conn_id = match options.aws_connection {
1037 Some(conn_id) => CatalogItemId::from(conn_id),
1038 None => sql_bail!("AWS CONNECTION is required for COPY ... TO <expr>"),
1039 };
1040 let connection = scx.get_item(&conn_id).connection()?;
1041
1042 match connection {
1043 mz_storage_types::connections::Connection::Aws(_) => {}
1044 _ => sql_bail!("only AWS CONNECTION is supported for COPY ... TO <expr>"),
1045 }
1046
1047 let format = match format {
1048 CopyFormat::Csv => {
1049 let quote = extract_byte_param_value(options.quote, "quote")?;
1050 let escape = extract_byte_param_value(options.escape, "escape")?;
1051 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1052 S3SinkFormat::PgCopy(CopyFormatParams::Csv(
1053 CopyCsvFormatParams::try_new(
1054 delimiter,
1055 quote,
1056 escape,
1057 options.header,
1058 options.null,
1059 )
1060 .map_err(|e| sql_err!("{}", e))?,
1061 ))
1062 }
1063 CopyFormat::Parquet => {
1064 ArrowBuilder::validate_desc(&desc).map_err(|e| sql_err!("{}", e))?;
1066 S3SinkFormat::Parquet
1067 }
1068 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1069 CopyFormat::Text => bail_unsupported!("FORMAT TEXT"),
1070 };
1071
1072 let mut to_expr = to.clone();
1074 transform_ast::transform(scx, &mut to_expr)?;
1075 let relation_type = RelationDesc::empty();
1076 let ecx = &ExprContext {
1077 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1078 name: "COPY TO target",
1079 scope: &Scope::empty(),
1080 relation_type: relation_type.typ(),
1081 allow_aggregates: false,
1082 allow_subqueries: false,
1083 allow_parameters: false,
1084 allow_windows: false,
1085 };
1086
1087 let to = plan_expr(ecx, &to_expr)?.type_as(ecx, &ScalarType::String)?;
1088
1089 if options.max_file_size.as_bytes() < MIN_S3_SINK_FILE_SIZE.as_bytes() {
1090 sql_bail!(
1091 "MAX FILE SIZE cannot be less than {}",
1092 MIN_S3_SINK_FILE_SIZE
1093 );
1094 }
1095 if options.max_file_size.as_bytes() > MAX_S3_SINK_FILE_SIZE.as_bytes() {
1096 sql_bail!(
1097 "MAX FILE SIZE cannot be greater than {}",
1098 MAX_S3_SINK_FILE_SIZE
1099 );
1100 }
1101
1102 Ok(Plan::CopyTo(CopyToPlan {
1103 select_plan,
1104 desc,
1105 to,
1106 connection: connection.to_owned(),
1107 connection_id: conn_id,
1108 format,
1109 max_file_size: options.max_file_size.as_bytes(),
1110 }))
1111}
1112
1113fn plan_copy_from(
1114 scx: &StatementContext,
1115 target: &CopyTarget<Aug>,
1116 table_name: ResolvedItemName,
1117 columns: Vec<Ident>,
1118 format: CopyFormat,
1119 options: CopyOptionExtracted,
1120) -> Result<Plan, PlanError> {
1121 fn only_available_with_csv<T>(option: Option<T>, param: &str) -> Result<(), PlanError> {
1122 match option {
1123 Some(_) => sql_bail!("COPY {} available only in CSV mode", param),
1124 None => Ok(()),
1125 }
1126 }
1127
1128 let source = match target {
1129 CopyTarget::Stdin => CopyFromSource::Stdin,
1130 CopyTarget::Expr(from) => {
1131 scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?;
1132
1133 let mut from_expr = from.clone();
1135 transform_ast::transform(scx, &mut from_expr)?;
1136 let relation_type = RelationDesc::empty();
1137 let ecx = &ExprContext {
1138 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
1139 name: "COPY FROM target",
1140 scope: &Scope::empty(),
1141 relation_type: relation_type.typ(),
1142 allow_aggregates: false,
1143 allow_subqueries: false,
1144 allow_parameters: false,
1145 allow_windows: false,
1146 };
1147 let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?;
1148
1149 match options.aws_connection {
1150 Some(conn_id) => {
1151 let conn_id = CatalogItemId::from(conn_id);
1152
1153 let connection = match scx.get_item(&conn_id).connection()? {
1155 mz_storage_types::connections::Connection::Aws(conn) => conn,
1156 _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"),
1157 };
1158
1159 CopyFromSource::AwsS3 {
1160 uri: from,
1161 connection,
1162 connection_id: conn_id,
1163 }
1164 }
1165 None => CopyFromSource::Url(from),
1166 }
1167 }
1168 CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target),
1169 };
1170
1171 let params = match format {
1172 CopyFormat::Text => {
1173 only_available_with_csv(options.quote, "quote")?;
1174 only_available_with_csv(options.escape, "escape")?;
1175 only_available_with_csv(options.header, "HEADER")?;
1176 let delimiter =
1177 extract_byte_param_value(options.delimiter, "delimiter")?.unwrap_or(b'\t');
1178 let null = match options.null {
1179 Some(null) => Cow::from(null),
1180 None => Cow::from("\\N"),
1181 };
1182 CopyFormatParams::Text(CopyTextFormatParams { null, delimiter })
1183 }
1184 CopyFormat::Csv => {
1185 let quote = extract_byte_param_value(options.quote, "quote")?;
1186 let escape = extract_byte_param_value(options.escape, "escape")?;
1187 let delimiter = extract_byte_param_value(options.delimiter, "delimiter")?;
1188 CopyFormatParams::Csv(
1189 CopyCsvFormatParams::try_new(
1190 delimiter,
1191 quote,
1192 escape,
1193 options.header,
1194 options.null,
1195 )
1196 .map_err(|e| sql_err!("{}", e))?,
1197 )
1198 }
1199 CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
1200 CopyFormat::Parquet => CopyFormatParams::Parquet,
1201 };
1202
1203 let filter = match (options.files, options.pattern) {
1204 (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"),
1205 (Some(files), None) => Some(CopyFromFilter::Files(files)),
1206 (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)),
1207 (None, None) => None,
1208 };
1209
1210 if filter.is_some() && matches!(source, CopyFromSource::Stdin) {
1211 bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL")
1212 }
1213
1214 let (id, source_desc, columns, maybe_mfp) = query::plan_copy_from(scx, table_name, columns)?;
1215
1216 let Some(mfp) = maybe_mfp else {
1217 sql_bail!("[internal error] COPY FROM ... expects an MFP to be produced");
1218 };
1219
1220 Ok(Plan::CopyFrom(CopyFromPlan {
1221 id,
1222 source,
1223 columns,
1224 source_desc,
1225 mfp,
1226 params,
1227 filter,
1228 }))
1229}
1230
1231fn extract_byte_param_value(v: Option<String>, param_name: &str) -> Result<Option<u8>, PlanError> {
1232 match v {
1233 Some(v) if v.len() == 1 => Ok(Some(v.as_bytes()[0])),
1234 Some(..) => sql_bail!("COPY {} must be a single one-byte character", param_name),
1235 None => Ok(None),
1236 }
1237}
1238
1239generate_extracted_config!(
1240 CopyOption,
1241 (Format, String),
1242 (Delimiter, String),
1243 (Null, String),
1244 (Escape, String),
1245 (Quote, String),
1246 (Header, bool),
1247 (AwsConnection, with_options::Object),
1248 (MaxFileSize, ByteSize, Default(ByteSize::mb(256))),
1249 (Files, Vec<String>),
1250 (Pattern, String)
1251);
1252
1253pub fn plan_copy(
1254 scx: &StatementContext,
1255 CopyStatement {
1256 relation,
1257 direction,
1258 target,
1259 options,
1260 }: CopyStatement<Aug>,
1261) -> Result<Plan, PlanError> {
1262 let options = CopyOptionExtracted::try_from(options)?;
1263 let format = options
1266 .format
1267 .as_ref()
1268 .map(|format| match format.to_lowercase().as_str() {
1269 "text" => Ok(CopyFormat::Text),
1270 "csv" => Ok(CopyFormat::Csv),
1271 "binary" => Ok(CopyFormat::Binary),
1272 "parquet" => Ok(CopyFormat::Parquet),
1273 _ => sql_bail!("unknown FORMAT: {}", format),
1274 })
1275 .transpose()?;
1276
1277 match (&direction, &target) {
1278 (CopyDirection::To, CopyTarget::Stdout) => {
1279 if options.delimiter.is_some() {
1280 sql_bail!("COPY TO does not support DELIMITER option yet");
1281 }
1282 if options.quote.is_some() {
1283 sql_bail!("COPY TO does not support QUOTE option yet");
1284 }
1285 if options.null.is_some() {
1286 sql_bail!("COPY TO does not support NULL option yet");
1287 }
1288 match relation {
1289 CopyRelation::Named { .. } => sql_bail!("named with COPY TO STDOUT unsupported"),
1290 CopyRelation::Select(stmt) => Ok(plan_select(
1291 scx,
1292 stmt,
1293 &Params::empty(),
1294 Some(format.unwrap_or(CopyFormat::Text)),
1295 )?),
1296 CopyRelation::Subscribe(stmt) => Ok(plan_subscribe(
1297 scx,
1298 stmt,
1299 &Params::empty(),
1300 Some(format.unwrap_or(CopyFormat::Text)),
1301 )?),
1302 }
1303 }
1304 (CopyDirection::From, target) => match relation {
1305 CopyRelation::Named { name, columns } => plan_copy_from(
1306 scx,
1307 target,
1308 name,
1309 columns,
1310 format.unwrap_or(CopyFormat::Text),
1311 options,
1312 ),
1313 _ => sql_bail!("COPY FROM {} not supported", target),
1314 },
1315 (CopyDirection::To, CopyTarget::Expr(to_expr)) => {
1316 if !scx.catalog.active_role_id().is_system() {
1321 scx.require_feature_flag(&vars::ENABLE_COPY_TO_EXPR)?;
1322 }
1323
1324 let format = match format {
1325 Some(inner) => inner,
1326 _ => sql_bail!("COPY TO <expr> requires a FORMAT option"),
1327 };
1328
1329 let stmt = match relation {
1330 CopyRelation::Named { name, columns } => {
1331 if !columns.is_empty() {
1332 sql_bail!(
1334 "specifying columns for COPY <table_name> TO commands not yet supported; use COPY (SELECT...) TO ... instead"
1335 );
1336 }
1337 let query = Query {
1339 ctes: CteBlock::empty(),
1340 body: SetExpr::Table(name),
1341 order_by: vec![],
1342 limit: None,
1343 offset: None,
1344 };
1345 SelectStatement { query, as_of: None }
1346 }
1347 CopyRelation::Select(stmt) => {
1348 if !stmt.query.order_by.is_empty() {
1349 sql_bail!("ORDER BY is not supported in SELECT query for COPY statements")
1350 }
1351 stmt
1352 }
1353 _ => sql_bail!("COPY {} {} not supported", direction, target),
1354 };
1355
1356 let (plan, desc) = plan_select_inner(scx, stmt, &Params::empty(), None)?;
1357 plan_copy_to_expr(scx, plan, desc, to_expr, format, options)
1358 }
1359 _ => sql_bail!("COPY {} {} not supported", direction, target),
1360 }
1361}