1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52 func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67 RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103 literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111 pub expr: E,
112 pub desc: RelationDesc,
113 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114 pub scope: Scope,
115}
116
117#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127 scx: &StatementContext,
128 mut query: Query<Aug>,
129 lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131 transform_ast::transform(scx, &mut query)?;
132 let mut qcx = QueryContext::root(scx, lifetime);
133 let PlannedQuery {
134 mut expr,
135 scope,
136 order_by,
137 limit,
138 offset,
139 project,
140 group_size_hints,
141 } = plan_query(&mut qcx, &query)?;
142
143 let mut finishing = RowSetFinishing {
144 limit,
145 offset,
146 project,
147 order_by,
148 };
149
150 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157 if lifetime.is_maintained() {
158 expr.finish_maintained(&mut finishing, group_size_hints);
159 }
160
161 let typ = qcx.relation_type(&expr);
162 let typ = SqlRelationType::new(
163 finishing
164 .project
165 .iter()
166 .map(|i| typ.column_types[*i].clone())
167 .collect(),
168 );
169 let desc = RelationDesc::new(typ, scope.column_names());
170
171 Ok(PlannedRootQuery {
172 expr,
173 desc,
174 finishing,
175 scope,
176 })
177}
178
179#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182 qcx: &mut QueryContext,
183 mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185 transform_ast::transform(qcx.scx, &mut query)?;
186 let PlannedQuery {
187 mut expr,
188 scope,
189 order_by,
190 limit,
191 offset,
192 project,
193 group_size_hints,
194 } = plan_query(qcx, &query)?;
195
196 let mut finishing = RowSetFinishing {
197 limit,
198 offset,
199 project,
200 order_by,
201 };
202
203 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210 expr.finish_maintained(&mut finishing, group_size_hints);
211
212 let typ = qcx.relation_type(&expr);
213 let typ = SqlRelationType::new(
214 finishing
215 .project
216 .iter()
217 .map(|i| typ.column_types[*i].clone())
218 .collect(),
219 );
220 let desc = RelationDesc::new(typ, scope.column_names());
221
222 Ok(PlannedRootQuery {
223 expr,
224 desc,
225 finishing,
226 scope,
227 })
228}
229
230fn try_push_projection_order_by(
241 expr: &mut HirRelationExpr,
242 project: &mut Vec<usize>,
243 order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245 let mut unproject = vec![None; expr.arity()];
246 for (out_i, in_i) in project.iter().copied().enumerate() {
247 unproject[in_i] = Some(out_i);
248 }
249 if order_by
250 .iter()
251 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252 {
253 let trivial_project = (0..project.len()).collect();
254 *expr = expr.take().project(mem::replace(project, trivial_project));
255 for ob in order_by {
256 ob.column = unproject[ob.column].unwrap();
257 }
258 true
259 } else {
260 false
261 }
262}
263
264pub fn plan_insert_query(
265 scx: &StatementContext,
266 table_name: ResolvedItemName,
267 columns: Vec<Ident>,
268 source: InsertSource<Aug>,
269 returning: Vec<SelectItem<Aug>>,
270) -> Result<
271 (
272 CatalogItemId,
273 HirRelationExpr,
274 PlannedRootQuery<Vec<HirScalarExpr>>,
275 ),
276 PlanError,
277> {
278 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279 let table = scx.get_item_by_resolved_name(&table_name)?;
280
281 if table.item_type() != CatalogItemType::Table {
283 sql_bail!(
284 "cannot insert into {} '{}'",
285 table.item_type(),
286 table_name.full_name_str()
287 );
288 }
289 let desc = table.relation_desc().expect("table has desc");
290 let mut defaults = table
291 .writable_table_details()
292 .ok_or_else(|| {
293 sql_err!(
294 "cannot insert into non-writeable table '{}'",
295 table_name.full_name_str()
296 )
297 })?
298 .to_vec();
299
300 for default in &mut defaults {
301 transform_ast::transform(scx, default)?;
302 }
303
304 if table.id().is_system() {
305 sql_bail!(
306 "cannot insert into system table '{}'",
307 table_name.full_name_str()
308 );
309 }
310
311 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313 let mut source_types = Vec::with_capacity(columns.len());
315 let mut ordering = Vec::with_capacity(columns.len());
316
317 if columns.is_empty() {
318 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321 ordering.extend(0..desc.arity());
322 } else {
323 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324 .iter()
325 .enumerate()
326 .map(|(idx, (name, typ))| (name, (idx, typ)))
327 .collect();
328
329 for c in &columns {
330 if let Some((idx, typ)) = column_by_name.get(c) {
331 ordering.push(*idx);
332 source_types.push(&typ.scalar_type);
333 } else {
334 sql_bail!(
335 "column {} of relation {} does not exist",
336 c.quoted(),
337 table_name.full_name_str().quoted()
338 );
339 }
340 }
341 if let Some(dup) = columns.iter().duplicates().next() {
342 sql_bail!("column {} specified more than once", dup.quoted());
343 }
344 };
345
346 let expr = match source {
348 InsertSource::Query(mut query) => {
349 transform_ast::transform(scx, &mut query)?;
350
351 match query {
352 Query {
354 body: SetExpr::Values(Values(values)),
355 ctes,
356 order_by,
357 limit: None,
358 offset: None,
359 } if ctes.is_empty() && order_by.is_empty() => {
360 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361 plan_values_insert(&qcx, &names, &source_types, &values)?
362 }
363 _ => {
364 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365 expr
366 }
367 }
368 }
369 InsertSource::DefaultValues => {
370 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371 }
372 };
373
374 let expr_arity = expr.arity();
375
376 let max_columns = if columns.is_empty() {
379 desc.arity()
380 } else {
381 columns.len()
382 };
383 if expr_arity > max_columns {
384 sql_bail!("INSERT has more expressions than target columns");
385 }
386 if expr_arity < columns.len() {
388 sql_bail!("INSERT has more target columns than expressions");
389 }
390
391 source_types.truncate(expr_arity);
393 ordering.truncate(expr_arity);
394
395 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398 sql_err!(
399 "column {} is of type {} but expression is of type {}",
400 desc.get_name(ordering[e.column]).quoted(),
401 qcx.humanize_scalar_type(&e.target_type, false),
402 qcx.humanize_scalar_type(&e.source_type, false),
403 )
404 })?;
405
406 let mut map_exprs = vec![];
408 let mut project_key = Vec::with_capacity(desc.arity());
409
410 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414 for (col_idx, (col_typ, default)) in column_details {
415 if let Some(src_idx) = col_to_source.get(&col_idx) {
416 project_key.push(*src_idx);
417 } else {
418 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419 project_key.push(expr_arity + map_exprs.len());
420 map_exprs.push(hir);
421 }
422 }
423
424 let returning = {
425 let (scope, typ) = if let ResolvedItemName::Item {
426 full_name,
427 version: _,
428 ..
429 } = table_name
430 {
431 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432 let typ = desc.typ().clone();
433 (scope, typ)
434 } else {
435 (Scope::empty(), SqlRelationType::empty())
436 };
437 let ecx = &ExprContext {
438 qcx: &qcx,
439 name: "RETURNING clause",
440 scope: &scope,
441 relation_type: &typ,
442 allow_aggregates: false,
443 allow_subqueries: false,
444 allow_parameters: true,
445 allow_windows: false,
446 };
447 let table_func_names = BTreeMap::new();
448 let mut output_columns = vec![];
449 let mut new_exprs = vec![];
450 let mut new_type = SqlRelationType::empty();
451 for mut si in returning {
452 transform_ast::transform(scx, &mut si)?;
453 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454 let expr = match &select_item {
455 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457 };
458 output_columns.push(column_name);
459 let typ = ecx.column_type(&expr);
460 new_type.column_types.push(typ);
461 new_exprs.push(expr);
462 }
463 }
464 let desc = RelationDesc::new(new_type, output_columns);
465 let desc_arity = desc.arity();
466 PlannedRootQuery {
467 expr: new_exprs,
468 desc,
469 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = match item.relation_desc() {
507 Some(desc) => desc.into_owned(),
508 None => {
509 return Err(PlanError::InvalidDependency {
510 name: fullname.to_string(),
511 item_type: item.item_type().to_string(),
512 });
513 }
514 };
515 let mut ordering = Vec::with_capacity(columns.len());
516
517 let mfp = if let Some(table_defaults) = item.writable_table_details() {
527 let mut table_defaults = table_defaults.to_vec();
528
529 for default in &mut table_defaults {
530 transform_ast::transform(scx, default)?;
531 }
532
533 let source_column_names: Vec<_> = columns
535 .iter()
536 .cloned()
537 .map(normalize::column_name)
538 .collect();
539
540 let mut default_exprs = Vec::new();
541 let mut project_keys = Vec::with_capacity(table_desc.arity());
542
543 let column_details = table_desc.iter().zip_eq(table_defaults);
546 for ((col_name, col_type), col_default) in column_details {
547 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
548 if let Some(src_idx) = maybe_src_idx {
549 project_keys.push(src_idx);
550 } else {
551 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
554 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
555 project_keys.push(source_column_names.len() + default_exprs.len());
556 default_exprs.push(mir);
557 }
558 }
559
560 let mfp = MapFilterProject::new(source_column_names.len())
561 .map(default_exprs)
562 .project(project_keys);
563 Some(mfp)
564 } else {
565 None
566 };
567
568 let source_desc = if columns.is_empty() {
570 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
571 ordering.extend(indexes);
572
573 table_desc
575 } else {
576 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
577 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
578 .iter_all()
579 .map(|(idx, name, typ)| (name, (*idx, typ)))
580 .collect();
581
582 let mut names = Vec::with_capacity(columns.len());
583 let mut source_types = Vec::with_capacity(columns.len());
584
585 for c in &columns {
586 if let Some((idx, typ)) = column_by_name.get(c) {
587 ordering.push(*idx);
588 source_types.push((*typ).clone());
589 names.push(c.clone());
590 } else {
591 sql_bail!(
592 "column {} of relation {} does not exist",
593 c.quoted(),
594 item_name.full_name_str().quoted()
595 );
596 }
597 }
598 if let Some(dup) = columns.iter().duplicates().next() {
599 sql_bail!("column {} specified more than once", dup.quoted());
600 }
601
602 RelationDesc::new(SqlRelationType::new(source_types), names)
604 };
605
606 Ok((item.id(), source_desc, ordering, mfp))
607}
608
609pub fn plan_copy_from(
613 scx: &StatementContext,
614 table_name: ResolvedItemName,
615 columns: Vec<Ident>,
616) -> Result<
617 (
618 CatalogItemId,
619 RelationDesc,
620 Vec<ColumnIndex>,
621 Option<MapFilterProject>,
622 ),
623 PlanError,
624> {
625 let table = scx.get_item_by_resolved_name(&table_name)?;
626
627 if table.item_type() != CatalogItemType::Table {
629 sql_bail!(
630 "cannot insert into {} '{}'",
631 table.item_type(),
632 table_name.full_name_str()
633 );
634 }
635
636 let _ = table.writable_table_details().ok_or_else(|| {
637 sql_err!(
638 "cannot insert into non-writeable table '{}'",
639 table_name.full_name_str()
640 )
641 })?;
642
643 if table.id().is_system() {
644 sql_bail!(
645 "cannot insert into system table '{}'",
646 table_name.full_name_str()
647 );
648 }
649 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
650
651 Ok((id, desc, ordering, mfp))
652}
653
654pub fn plan_copy_from_rows(
657 pcx: &PlanContext,
658 catalog: &dyn SessionCatalog,
659 target_id: CatalogItemId,
660 target_name: String,
661 columns: Vec<ColumnIndex>,
662 rows: Vec<mz_repr::Row>,
663) -> Result<HirRelationExpr, PlanError> {
664 let scx = StatementContext::new(Some(pcx), catalog);
665
666 let table = catalog
668 .try_get_item(&target_id)
669 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
670 .at_version(RelationVersionSelector::Latest);
671
672 let mut defaults = table
673 .writable_table_details()
674 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
675 .to_vec();
676
677 for default in &mut defaults {
678 transform_ast::transform(&scx, default)?;
679 }
680
681 let desc = table.relation_desc().expect("table has desc");
682 let column_types = columns
683 .iter()
684 .map(|x| desc.get_type(x).clone())
685 .map(|mut x| {
686 x.nullable = true;
689 x
690 })
691 .collect();
692 let typ = SqlRelationType::new(column_types);
693 let expr = HirRelationExpr::Constant {
694 rows,
695 typ: typ.clone(),
696 };
697
698 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
704 if columns == default {
705 return Ok(expr);
706 }
707
708 let mut map_exprs = vec![];
710 let mut project_key = Vec::with_capacity(desc.arity());
711
712 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
714
715 let column_details = desc.iter_all().zip_eq(defaults);
716 for ((col_idx, _col_name, col_typ), default) in column_details {
717 if let Some(src_idx) = col_to_source.get(&col_idx) {
718 project_key.push(*src_idx);
719 } else {
720 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
721 project_key.push(typ.arity() + map_exprs.len());
722 map_exprs.push(hir);
723 }
724 }
725
726 Ok(expr.map(map_exprs).project(project_key))
727}
728
729pub struct ReadThenWritePlan {
731 pub id: CatalogItemId,
732 pub selection: HirRelationExpr,
737 pub assignments: BTreeMap<usize, HirScalarExpr>,
739 pub finishing: RowSetFinishing,
740}
741
742pub fn plan_delete_query(
743 scx: &StatementContext,
744 mut delete_stmt: DeleteStatement<Aug>,
745) -> Result<ReadThenWritePlan, PlanError> {
746 transform_ast::transform(scx, &mut delete_stmt)?;
747
748 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
749 plan_mutation_query_inner(
750 qcx,
751 delete_stmt.table_name,
752 delete_stmt.alias,
753 delete_stmt.using,
754 vec![],
755 delete_stmt.selection,
756 )
757}
758
759pub fn plan_update_query(
760 scx: &StatementContext,
761 mut update_stmt: UpdateStatement<Aug>,
762) -> Result<ReadThenWritePlan, PlanError> {
763 transform_ast::transform(scx, &mut update_stmt)?;
764
765 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
766
767 plan_mutation_query_inner(
768 qcx,
769 update_stmt.table_name,
770 update_stmt.alias,
771 vec![],
772 update_stmt.assignments,
773 update_stmt.selection,
774 )
775}
776
777pub fn plan_mutation_query_inner(
778 qcx: QueryContext,
779 table_name: ResolvedItemName,
780 alias: Option<TableAlias>,
781 using: Vec<TableWithJoins<Aug>>,
782 assignments: Vec<Assignment<Aug>>,
783 selection: Option<Expr<Aug>>,
784) -> Result<ReadThenWritePlan, PlanError> {
785 let (id, version) = match table_name {
787 ResolvedItemName::Item { id, version, .. } => (id, version),
788 _ => sql_bail!("cannot mutate non-user table"),
789 };
790
791 let item = qcx.scx.get_item(&id).at_version(version);
793 if item.item_type() != CatalogItemType::Table {
794 sql_bail!(
795 "cannot mutate {} '{}'",
796 item.item_type(),
797 table_name.full_name_str()
798 );
799 }
800 let _ = item.writable_table_details().ok_or_else(|| {
801 sql_err!(
802 "cannot mutate non-writeable table '{}'",
803 table_name.full_name_str()
804 )
805 })?;
806 if id.is_system() {
807 sql_bail!(
808 "cannot mutate system table '{}'",
809 table_name.full_name_str()
810 );
811 }
812
813 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
815 let scope = plan_table_alias(scope, alias.as_ref())?;
816 let desc = item.relation_desc().expect("table has desc");
817 let relation_type = qcx.relation_type(&get);
818
819 if using.is_empty() {
820 if let Some(expr) = selection {
821 let ecx = &ExprContext {
822 qcx: &qcx,
823 name: "WHERE clause",
824 scope: &scope,
825 relation_type: &relation_type,
826 allow_aggregates: false,
827 allow_subqueries: true,
828 allow_parameters: true,
829 allow_windows: false,
830 };
831 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
832 get = get.filter(vec![expr]);
833 }
834 } else {
835 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
836 }
837
838 let mut sets = BTreeMap::new();
839 for Assignment { id, value } in assignments {
840 let name = normalize::column_name(id);
842 match desc.get_by_name(&name) {
843 Some((idx, typ)) => {
844 let ecx = &ExprContext {
845 qcx: &qcx,
846 name: "SET clause",
847 scope: &scope,
848 relation_type: &relation_type,
849 allow_aggregates: false,
850 allow_subqueries: false,
851 allow_parameters: true,
852 allow_windows: false,
853 };
854 let expr = plan_expr(ecx, &value)?.cast_to(
855 ecx,
856 CastContext::Assignment,
857 &typ.scalar_type,
858 )?;
859
860 if sets.insert(idx, expr).is_some() {
861 sql_bail!("column {} set twice", name)
862 }
863 }
864 None => sql_bail!("unknown column {}", name),
865 };
866 }
867
868 let finishing = RowSetFinishing {
869 order_by: vec![],
870 limit: None,
871 offset: 0,
872 project: (0..desc.arity()).collect(),
873 };
874
875 Ok(ReadThenWritePlan {
876 id,
877 selection: get,
878 finishing,
879 assignments: sets,
880 })
881}
882
883fn handle_mutation_using_clause(
895 qcx: &QueryContext,
896 selection: Option<Expr<Aug>>,
897 using: Vec<TableWithJoins<Aug>>,
898 get: HirRelationExpr,
899 outer_scope: Scope,
900) -> Result<HirRelationExpr, PlanError> {
901 let (mut using_rel_expr, using_scope) =
905 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
906 let (left, left_scope) = l;
907 plan_join(
908 qcx,
909 left,
910 left_scope,
911 &Join {
912 relation: TableFactor::NestedJoin {
913 join: Box::new(twj),
914 alias: None,
915 },
916 join_operator: JoinOperator::CrossJoin,
917 },
918 )
919 })?;
920
921 if let Some(expr) = selection {
922 let on = HirScalarExpr::literal_true();
928 let joined = using_rel_expr
929 .clone()
930 .join(get.clone(), on, JoinKind::Inner);
931 let joined_scope = using_scope.product(outer_scope)?;
932 let joined_relation_type = qcx.relation_type(&joined);
933
934 let ecx = &ExprContext {
935 qcx,
936 name: "WHERE clause",
937 scope: &joined_scope,
938 relation_type: &joined_relation_type,
939 allow_aggregates: false,
940 allow_subqueries: true,
941 allow_parameters: true,
942 allow_windows: false,
943 };
944
945 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
947
948 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
952 use mz_expr::visit::Visit;
954 expr.visit_mut_post(&mut |e| {
955 if let HirScalarExpr::Column(c, _name) = e {
956 if c.column >= using_rel_arity {
957 c.level += 1;
958 c.column -= using_rel_arity;
959 };
960 }
961 })?;
962
963 using_rel_expr = using_rel_expr.filter(vec![expr]);
967 } else {
968 let _joined_scope = using_scope.product(outer_scope)?;
971 }
972 Ok(get.filter(vec![using_rel_expr.exists()]))
983}
984
985#[derive(Debug)]
986pub(crate) struct CastRelationError {
987 pub(crate) column: usize,
988 pub(crate) source_type: SqlScalarType,
989 pub(crate) target_type: SqlScalarType,
990}
991
992pub(crate) fn cast_relation<'a, I>(
996 qcx: &QueryContext,
997 ccx: CastContext,
998 expr: HirRelationExpr,
999 target_types: I,
1000) -> Result<HirRelationExpr, CastRelationError>
1001where
1002 I: IntoIterator<Item = &'a SqlScalarType>,
1003{
1004 let ecx = &ExprContext {
1005 qcx,
1006 name: "values",
1007 scope: &Scope::empty(),
1008 relation_type: &qcx.relation_type(&expr),
1009 allow_aggregates: false,
1010 allow_subqueries: true,
1011 allow_parameters: true,
1012 allow_windows: false,
1013 };
1014 let mut map_exprs = vec![];
1015 let mut project_key = vec![];
1016 for (i, target_typ) in target_types.into_iter().enumerate() {
1017 let expr = HirScalarExpr::column(i);
1018 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1022 Ok(cast_expr) => {
1023 if expr == cast_expr {
1024 project_key.push(i);
1026 } else {
1027 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1029 map_exprs.push(cast_expr);
1030 }
1031 }
1032 Err(_) => {
1033 return Err(CastRelationError {
1034 column: i,
1035 source_type: ecx.scalar_type(&expr),
1036 target_type: target_typ.clone(),
1037 });
1038 }
1039 }
1040 }
1041 Ok(expr.map(map_exprs).project(project_key))
1042}
1043
1044pub fn plan_as_of(
1047 scx: &StatementContext,
1048 as_of: Option<AsOf<Aug>>,
1049) -> Result<QueryWhen, PlanError> {
1050 match as_of {
1051 None => Ok(QueryWhen::Immediately),
1052 Some(as_of) => match as_of {
1053 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1054 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1055 },
1056 }
1057}
1058
1059pub fn plan_as_of_or_up_to(
1069 scx: &StatementContext,
1070 mut expr: Expr<Aug>,
1071) -> Result<mz_repr::Timestamp, PlanError> {
1072 let scope = Scope::empty();
1073 let desc = RelationDesc::empty();
1074 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1077 transform_ast::transform(scx, &mut expr)?;
1078 let ecx = &ExprContext {
1079 qcx: &qcx,
1080 name: "AS OF or UP TO",
1081 scope: &scope,
1082 relation_type: desc.typ(),
1083 allow_aggregates: false,
1084 allow_subqueries: false,
1085 allow_parameters: false,
1086 allow_windows: false,
1087 };
1088 let hir = plan_expr(ecx, &expr)?.cast_to(
1089 ecx,
1090 CastContext::Assignment,
1091 &SqlScalarType::MzTimestamp,
1092 )?;
1093 if hir.contains_unmaterializable() {
1094 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1095 }
1096 let timestamp = hir
1103 .into_literal_mz_timestamp()
1104 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1105 Ok(timestamp)
1106}
1107
1108pub fn plan_secret_as(
1110 scx: &StatementContext,
1111 mut expr: Expr<Aug>,
1112) -> Result<MirScalarExpr, PlanError> {
1113 let scope = Scope::empty();
1114 let desc = RelationDesc::empty();
1115 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1116
1117 transform_ast::transform(scx, &mut expr)?;
1118
1119 let ecx = &ExprContext {
1120 qcx: &qcx,
1121 name: "AS",
1122 scope: &scope,
1123 relation_type: desc.typ(),
1124 allow_aggregates: false,
1125 allow_subqueries: false,
1126 allow_parameters: false,
1127 allow_windows: false,
1128 };
1129 let expr = plan_expr(ecx, &expr)?
1130 .type_as(ecx, &SqlScalarType::Bytes)?
1131 .lower_uncorrelated(scx.catalog.system_vars())?;
1132 Ok(expr)
1133}
1134
1135pub fn plan_webhook_validate_using(
1137 scx: &StatementContext,
1138 validate_using: CreateWebhookSourceCheck<Aug>,
1139) -> Result<WebhookValidation, PlanError> {
1140 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1141
1142 let CreateWebhookSourceCheck {
1143 options,
1144 using: mut expr,
1145 } = validate_using;
1146
1147 let mut column_typs = vec![];
1148 let mut column_names = vec![];
1149
1150 let (bodies, headers, secrets) = options
1151 .map(|o| (o.bodies, o.headers, o.secrets))
1152 .unwrap_or_default();
1153
1154 let mut body_tuples = vec![];
1156 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1157 let scalar_type = use_bytes
1158 .then_some(SqlScalarType::Bytes)
1159 .unwrap_or(SqlScalarType::String);
1160 let name = alias
1161 .map(|a| a.into_string())
1162 .unwrap_or_else(|| "body".to_string());
1163
1164 column_typs.push(SqlColumnType {
1165 scalar_type,
1166 nullable: false,
1167 });
1168 column_names.push(name);
1169
1170 let column_idx = column_typs.len() - 1;
1172 assert_eq!(
1174 column_idx,
1175 column_names.len() - 1,
1176 "body column names and types don't match"
1177 );
1178 body_tuples.push((column_idx, use_bytes));
1179 }
1180
1181 let mut header_tuples = vec![];
1183
1184 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1185 let value_type = use_bytes
1186 .then_some(SqlScalarType::Bytes)
1187 .unwrap_or(SqlScalarType::String);
1188 let name = alias
1189 .map(|a| a.into_string())
1190 .unwrap_or_else(|| "headers".to_string());
1191
1192 column_typs.push(SqlColumnType {
1193 scalar_type: SqlScalarType::Map {
1194 value_type: Box::new(value_type),
1195 custom_id: None,
1196 },
1197 nullable: false,
1198 });
1199 column_names.push(name);
1200
1201 let column_idx = column_typs.len() - 1;
1203 assert_eq!(
1205 column_idx,
1206 column_names.len() - 1,
1207 "header column names and types don't match"
1208 );
1209 header_tuples.push((column_idx, use_bytes));
1210 }
1211
1212 let mut validation_secrets = vec![];
1214
1215 for CreateWebhookSourceSecret {
1216 secret,
1217 alias,
1218 use_bytes,
1219 } in secrets
1220 {
1221 let scalar_type = use_bytes
1223 .then_some(SqlScalarType::Bytes)
1224 .unwrap_or(SqlScalarType::String);
1225
1226 column_typs.push(SqlColumnType {
1227 scalar_type,
1228 nullable: false,
1229 });
1230 let ResolvedItemName::Item {
1231 id,
1232 full_name: FullItemName { item, .. },
1233 ..
1234 } = secret
1235 else {
1236 return Err(PlanError::InvalidSecret(Box::new(secret)));
1237 };
1238
1239 let name = if let Some(alias) = alias {
1241 alias.into_string()
1242 } else {
1243 item
1244 };
1245 column_names.push(name);
1246
1247 let column_idx = column_typs.len() - 1;
1250 assert_eq!(
1252 column_idx,
1253 column_names.len() - 1,
1254 "column names and types don't match"
1255 );
1256
1257 validation_secrets.push(WebhookValidationSecret {
1258 id,
1259 column_idx,
1260 use_bytes,
1261 });
1262 }
1263
1264 let relation_typ = SqlRelationType::new(column_typs);
1265 let desc = RelationDesc::new(relation_typ, column_names.clone());
1266 let scope = Scope::from_source(None, column_names);
1267
1268 transform_ast::transform(scx, &mut expr)?;
1269
1270 let ecx = &ExprContext {
1271 qcx: &qcx,
1272 name: "CHECK",
1273 scope: &scope,
1274 relation_type: desc.typ(),
1275 allow_aggregates: false,
1276 allow_subqueries: false,
1277 allow_parameters: false,
1278 allow_windows: false,
1279 };
1280 let expr = plan_expr(ecx, &expr)?
1281 .type_as(ecx, &SqlScalarType::Bool)?
1282 .lower_uncorrelated(scx.catalog.system_vars())?;
1283 let validation = WebhookValidation {
1284 expression: expr,
1285 relation_desc: desc,
1286 bodies: body_tuples,
1287 headers: header_tuples,
1288 secrets: validation_secrets,
1289 };
1290 Ok(validation)
1291}
1292
1293pub fn plan_default_expr(
1294 scx: &StatementContext,
1295 expr: &Expr<Aug>,
1296 target_ty: &SqlScalarType,
1297) -> Result<HirScalarExpr, PlanError> {
1298 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1299 let ecx = &ExprContext {
1300 qcx: &qcx,
1301 name: "DEFAULT expression",
1302 scope: &Scope::empty(),
1303 relation_type: &SqlRelationType::empty(),
1304 allow_aggregates: false,
1305 allow_subqueries: false,
1306 allow_parameters: false,
1307 allow_windows: false,
1308 };
1309 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1310 Ok(hir)
1311}
1312
1313pub fn plan_params<'a>(
1314 scx: &'a StatementContext,
1315 params: Vec<Expr<Aug>>,
1316 desc: &StatementDesc,
1317) -> Result<Params, PlanError> {
1318 if params.len() != desc.param_types.len() {
1319 sql_bail!(
1320 "expected {} params, got {}",
1321 desc.param_types.len(),
1322 params.len()
1323 );
1324 }
1325
1326 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1327
1328 let mut datums = Row::default();
1329 let mut packer = datums.packer();
1330 let mut actual_types = Vec::new();
1331 let temp_storage = &RowArena::new();
1332 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1333 transform_ast::transform(scx, &mut expr)?;
1334
1335 let ecx = execute_expr_context(&qcx);
1336 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1337 let actual_ty = ecx.scalar_type(&ex);
1338 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1339 return Err(PlanError::WrongParameterType(
1340 i + 1,
1341 ecx.humanize_scalar_type(expected_ty, false),
1342 ecx.humanize_scalar_type(&actual_ty, false),
1343 ));
1344 }
1345 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1346 let evaled = ex.eval(&[], temp_storage)?;
1347 packer.push(evaled);
1348 actual_types.push(actual_ty);
1349 }
1350 Ok(Params {
1351 datums,
1352 execute_types: actual_types,
1353 expected_types: desc.param_types.clone(),
1354 })
1355}
1356
1357static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1358static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1359
1360pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1362 ExprContext {
1363 qcx,
1364 name: "EXECUTE",
1365 scope: &EXECUTE_CONTEXT_SCOPE,
1366 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1367 allow_aggregates: false,
1368 allow_subqueries: false,
1369 allow_parameters: false,
1370 allow_windows: false,
1371 }
1372}
1373
1374pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1379 LazyLock::new(|| CastContext::Assignment);
1380
1381pub fn plan_index_exprs<'a>(
1382 scx: &'a StatementContext,
1383 on_desc: &RelationDesc,
1384 exprs: Vec<Expr<Aug>>,
1385) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1386 let scope = Scope::from_source(None, on_desc.iter_names());
1387 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1388
1389 let ecx = &ExprContext {
1390 qcx: &qcx,
1391 name: "CREATE INDEX",
1392 scope: &scope,
1393 relation_type: on_desc.typ(),
1394 allow_aggregates: false,
1395 allow_subqueries: false,
1396 allow_parameters: false,
1397 allow_windows: false,
1398 };
1399 let mut out = vec![];
1400 for mut expr in exprs {
1401 transform_ast::transform(scx, &mut expr)?;
1402 let expr = plan_expr_or_col_index(ecx, &expr)?;
1403 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1404 expr.reduce(&on_desc.typ().column_types);
1405 out.push(expr);
1406 }
1407 Ok(out)
1408}
1409
1410fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1411 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1412 Some(column) => Ok(HirScalarExpr::column(column)),
1413 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1414 }
1415}
1416
1417fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1418 match e {
1419 Expr::Value(Value::Number(n)) => {
1420 let n = n.parse::<usize>().map_err(|e| {
1421 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1422 })?;
1423 if n < 1 || n > max {
1424 sql_bail!(
1425 "column reference {} in {} is out of range (1 - {})",
1426 n,
1427 name,
1428 max
1429 );
1430 }
1431 Ok(Some(n - 1))
1432 }
1433 _ => Ok(None),
1434 }
1435}
1436
1437struct PlannedQuery {
1438 expr: HirRelationExpr,
1439 scope: Scope,
1440 order_by: Vec<ColumnOrder>,
1441 limit: Option<HirScalarExpr>,
1442 offset: HirScalarExpr,
1448 project: Vec<usize>,
1449 group_size_hints: GroupSizeHints,
1450}
1451
1452fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1453 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1454}
1455
1456fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1457 let cte_bindings = plan_ctes(qcx, q)?;
1460
1461 let limit = match &q.limit {
1462 None => None,
1463 Some(Limit {
1464 quantity,
1465 with_ties: false,
1466 }) => {
1467 let ecx = &ExprContext {
1468 qcx,
1469 name: "LIMIT",
1470 scope: &Scope::empty(),
1471 relation_type: &SqlRelationType::empty(),
1472 allow_aggregates: false,
1473 allow_subqueries: true,
1474 allow_parameters: true,
1475 allow_windows: false,
1476 };
1477 let limit = plan_expr(ecx, quantity)?;
1478 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1479
1480 let limit = if limit.is_constant() {
1481 let arena = RowArena::new();
1482 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1483
1484 match limit.eval(&[], &arena)? {
1488 d @ Datum::Int64(v) if v >= 0 => {
1489 HirScalarExpr::literal(d, SqlScalarType::Int64)
1490 }
1491 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1492 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1493 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1494 }
1495 } else {
1496 qcx.scx
1498 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1499 limit
1500 };
1501
1502 Some(limit)
1503 }
1504 Some(Limit {
1505 quantity: _,
1506 with_ties: true,
1507 }) => bail_unsupported!("FETCH ... WITH TIES"),
1508 };
1509
1510 let offset = match &q.offset {
1511 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1512 Some(offset) => {
1513 let ecx = &ExprContext {
1514 qcx,
1515 name: "OFFSET",
1516 scope: &Scope::empty(),
1517 relation_type: &SqlRelationType::empty(),
1518 allow_aggregates: false,
1519 allow_subqueries: false,
1520 allow_parameters: true,
1521 allow_windows: false,
1522 };
1523 let offset = plan_expr(ecx, offset)?;
1524 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1525
1526 let offset = if offset.is_constant() {
1527 let offset_value = offset_into_value(offset)?;
1529 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1530 } else {
1531 if !offset.contains_parameters() {
1535 return Err(PlanError::InvalidOffset(format!(
1536 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1537 offset
1538 )));
1539 }
1540 offset
1541 };
1542 offset
1543 }
1544 };
1545
1546 let mut planned_query = match &q.body {
1547 SetExpr::Select(s) => {
1548 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1550 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1551
1552 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1553 PlannedQuery {
1554 expr: plan.expr,
1555 scope: plan.scope,
1556 order_by: plan.order_by,
1557 project: plan.project,
1558 limit,
1559 offset,
1560 group_size_hints,
1561 }
1562 }
1563 _ => {
1564 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1565 let ecx = &ExprContext {
1566 qcx,
1567 name: "ORDER BY clause of a set expression",
1568 scope: &scope,
1569 relation_type: &qcx.relation_type(&expr),
1570 allow_aggregates: false,
1571 allow_subqueries: true,
1572 allow_parameters: true,
1573 allow_windows: false,
1574 };
1575 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1576 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1577 let project = (0..ecx.relation_type.arity()).collect();
1578 PlannedQuery {
1579 expr: expr.map(map_exprs),
1580 scope,
1581 order_by,
1582 limit,
1583 project,
1584 offset,
1585 group_size_hints: GroupSizeHints::default(),
1586 }
1587 }
1588 };
1589
1590 match &q.ctes {
1592 CteBlock::Simple(_) => {
1593 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1594 if let Some(cte) = qcx.ctes.remove(&id) {
1595 planned_query.expr = HirRelationExpr::Let {
1596 name: cte.name,
1597 id: id.clone(),
1598 value: Box::new(value),
1599 body: Box::new(planned_query.expr),
1600 };
1601 }
1602 if let Some(shadowed_val) = shadowed_val {
1603 qcx.ctes.insert(id, shadowed_val);
1604 }
1605 }
1606 }
1607 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1608 let MutRecBlockOptionExtracted {
1609 recursion_limit,
1610 return_at_recursion_limit,
1611 error_at_recursion_limit,
1612 seen: _,
1613 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1614 let limit = match (
1615 recursion_limit,
1616 return_at_recursion_limit,
1617 error_at_recursion_limit,
1618 ) {
1619 (None, None, None) => None,
1620 (Some(max_iters), None, None) => {
1621 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1622 }
1623 (None, Some(max_iters), None) => Some((max_iters, true)),
1624 (None, None, Some(max_iters)) => Some((max_iters, false)),
1625 _ => {
1626 return Err(InvalidWmrRecursionLimit(
1627 "More than one recursion limit given. \
1628 Please give at most one of RECURSION LIMIT, \
1629 ERROR AT RECURSION LIMIT, \
1630 RETURN AT RECURSION LIMIT."
1631 .to_owned(),
1632 ));
1633 }
1634 }
1635 .try_map(|(max_iters, return_at_limit)| {
1636 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1637 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1638 "Recursion limit has to be greater than 0.".to_owned(),
1639 ))?,
1640 return_at_limit: *return_at_limit,
1641 })
1642 })?;
1643
1644 let mut bindings = Vec::new();
1645 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1646 if let Some(cte) = qcx.ctes.remove(&id) {
1647 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1648 }
1649 if let Some(shadowed_val) = shadowed_val {
1650 qcx.ctes.insert(id, shadowed_val);
1651 }
1652 }
1653 if !bindings.is_empty() {
1654 planned_query.expr = HirRelationExpr::LetRec {
1655 limit,
1656 bindings,
1657 body: Box::new(planned_query.expr),
1658 }
1659 }
1660 }
1661 }
1662
1663 Ok(planned_query)
1664}
1665
1666pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1668 let offset = offset
1669 .try_into_literal_int64()
1670 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1671 if offset < 0 {
1672 return Err(PlanError::InvalidOffset(format!(
1673 "must not be negative, got {}",
1674 offset
1675 )));
1676 }
1677 Ok(offset)
1678}
1679
1680generate_extracted_config!(
1681 MutRecBlockOption,
1682 (RecursionLimit, u64),
1683 (ReturnAtRecursionLimit, u64),
1684 (ErrorAtRecursionLimit, u64)
1685);
1686
1687pub fn plan_ctes(
1692 qcx: &mut QueryContext,
1693 q: &Query<Aug>,
1694) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1695 let mut result = Vec::new();
1697 let mut shadowed_descs = BTreeMap::new();
1700
1701 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1703 sql_bail!(
1704 "WITH query name {} specified more than once",
1705 normalize::ident_ref(ident).quoted()
1706 )
1707 }
1708
1709 match &q.ctes {
1710 CteBlock::Simple(ctes) => {
1711 for cte in ctes.iter() {
1713 let cte_name = normalize::ident(cte.alias.name.clone());
1714 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1715 let typ = qcx.relation_type(&val);
1716 let mut desc = RelationDesc::new(typ, scope.column_names());
1717 plan_utils::maybe_rename_columns(
1718 format!("CTE {}", cte.alias.name),
1719 &mut desc,
1720 &cte.alias.columns,
1721 )?;
1722 let shadowed = qcx.ctes.insert(
1724 cte.id,
1725 CteDesc {
1726 name: cte_name,
1727 desc,
1728 },
1729 );
1730
1731 result.push((cte.id, val, shadowed));
1732 }
1733 }
1734 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1735 for cte in ctes.iter() {
1737 let cte_name = normalize::ident(cte.name.clone());
1738 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1739 for column in cte.columns.iter() {
1740 desc_columns.push((
1741 normalize::column_name(column.name.clone()),
1742 SqlColumnType {
1743 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1744 nullable: true,
1745 },
1746 ));
1747 }
1748 let desc = RelationDesc::from_names_and_types(desc_columns);
1749 let shadowed = qcx.ctes.insert(
1750 cte.id,
1751 CteDesc {
1752 name: cte_name,
1753 desc,
1754 },
1755 );
1756 if let Some(shadowed) = shadowed {
1758 shadowed_descs.insert(cte.id, shadowed);
1759 }
1760 }
1761
1762 for cte in ctes.iter() {
1764 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1765
1766 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1767
1768 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1769 sql_bail!(
1772 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1773 );
1774 }
1775
1776 if !proposed_typ.keys.is_empty() {
1777 sql_bail!("[internal error]: WMR CTEs do not support keys");
1780 }
1781
1782 let derived_typ = qcx.relation_type(&val);
1784
1785 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1786 let cte_name = normalize::ident(cte.name.clone());
1787 let proposed_typ = proposed_typ
1788 .column_types
1789 .iter()
1790 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1791 .collect::<Vec<_>>();
1792 let inferred_typ = derived_typ
1793 .column_types
1794 .iter()
1795 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1796 .collect::<Vec<_>>();
1797 Err(PlanError::RecursiveTypeMismatch(
1798 cte_name,
1799 proposed_typ,
1800 inferred_typ,
1801 ))
1802 };
1803
1804 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1805 return type_err(proposed_typ, derived_typ);
1806 }
1807
1808 let val = match cast_relation(
1810 qcx,
1811 CastContext::Assignment,
1816 val,
1817 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1818 ) {
1819 Ok(val) => val,
1820 Err(_) => return type_err(proposed_typ, derived_typ),
1821 };
1822
1823 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1824 }
1825 }
1826 }
1827
1828 Ok(result)
1829}
1830
1831pub fn plan_nested_query(
1832 qcx: &mut QueryContext,
1833 q: &Query<Aug>,
1834) -> Result<(HirRelationExpr, Scope), PlanError> {
1835 let PlannedQuery {
1836 mut expr,
1837 scope,
1838 order_by,
1839 limit,
1840 offset,
1841 project,
1842 group_size_hints,
1843 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1844 if limit.is_some()
1845 || !offset
1846 .clone()
1847 .try_into_literal_int64()
1848 .is_ok_and(|offset| offset == 0)
1849 {
1850 expr = HirRelationExpr::top_k(
1851 expr,
1852 vec![],
1853 order_by,
1854 limit,
1855 offset,
1856 group_size_hints.limit_input_group_size,
1857 );
1858 }
1859 Ok((expr.project(project), scope))
1860}
1861
1862fn plan_set_expr(
1863 qcx: &mut QueryContext,
1864 q: &SetExpr<Aug>,
1865) -> Result<(HirRelationExpr, Scope), PlanError> {
1866 match q {
1867 SetExpr::Select(select) => {
1868 let order_by_exprs = Vec::new();
1869 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1870 assert!(plan.order_by.is_empty());
1873 Ok((plan.expr.project(plan.project), plan.scope))
1874 }
1875 SetExpr::SetOperation {
1876 op,
1877 all,
1878 left,
1879 right,
1880 } => {
1881 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1883 let (right_expr, right_scope) =
1884 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1885
1886 let left_type = qcx.relation_type(&left_expr);
1888 let right_type = qcx.relation_type(&right_expr);
1889 if left_type.arity() != right_type.arity() {
1890 sql_bail!(
1891 "each {} query must have the same number of columns: {} vs {}",
1892 op,
1893 left_type.arity(),
1894 right_type.arity(),
1895 );
1896 }
1897
1898 let left_ecx = &ExprContext {
1903 qcx,
1904 name: &op.to_string(),
1905 scope: &left_scope,
1906 relation_type: &left_type,
1907 allow_aggregates: false,
1908 allow_subqueries: false,
1909 allow_parameters: false,
1910 allow_windows: false,
1911 };
1912 let right_ecx = &ExprContext {
1913 qcx,
1914 name: &op.to_string(),
1915 scope: &right_scope,
1916 relation_type: &right_type,
1917 allow_aggregates: false,
1918 allow_subqueries: false,
1919 allow_parameters: false,
1920 allow_windows: false,
1921 };
1922 let mut left_casts = vec![];
1923 let mut right_casts = vec![];
1924 for (i, (left_type, right_type)) in left_type
1925 .column_types
1926 .iter()
1927 .zip_eq(right_type.column_types.iter())
1928 .enumerate()
1929 {
1930 let types = &[
1931 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1932 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1933 ];
1934 let target =
1935 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1936 match typeconv::plan_cast(
1937 left_ecx,
1938 CastContext::Implicit,
1939 HirScalarExpr::column(i),
1940 &target,
1941 ) {
1942 Ok(expr) => left_casts.push(expr),
1943 Err(_) => sql_bail!(
1944 "{} types {} and {} cannot be matched",
1945 op,
1946 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1947 qcx.humanize_scalar_type(&target, false),
1948 ),
1949 }
1950 match typeconv::plan_cast(
1951 right_ecx,
1952 CastContext::Implicit,
1953 HirScalarExpr::column(i),
1954 &target,
1955 ) {
1956 Ok(expr) => right_casts.push(expr),
1957 Err(_) => sql_bail!(
1958 "{} types {} and {} cannot be matched",
1959 op,
1960 qcx.humanize_scalar_type(&target, false),
1961 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1962 ),
1963 }
1964 }
1965 let lhs = if left_casts
1966 .iter()
1967 .enumerate()
1968 .any(|(i, e)| e != &HirScalarExpr::column(i))
1969 {
1970 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1971 left_expr.map(left_casts).project(project_key)
1972 } else {
1973 left_expr
1974 };
1975 let rhs = if right_casts
1976 .iter()
1977 .enumerate()
1978 .any(|(i, e)| e != &HirScalarExpr::column(i))
1979 {
1980 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1981 right_expr.map(right_casts).project(project_key)
1982 } else {
1983 right_expr
1984 };
1985
1986 let relation_expr = match op {
1987 SetOperator::Union => {
1988 if *all {
1989 lhs.union(rhs)
1990 } else {
1991 lhs.union(rhs).distinct()
1992 }
1993 }
1994 SetOperator::Except => Hir::except(all, lhs, rhs),
1995 SetOperator::Intersect => {
1996 let left_clone = lhs.clone();
2002 if *all {
2003 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2004 } else {
2005 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2006 .distinct()
2007 }
2008 }
2009 };
2010 let scope = Scope::from_source(
2011 None,
2012 left_scope.column_names(),
2014 );
2015
2016 Ok((relation_expr, scope))
2017 }
2018 SetExpr::Values(Values(values)) => plan_values(qcx, values),
2019 SetExpr::Table(name) => {
2020 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2021 Ok((expr, scope))
2022 }
2023 SetExpr::Query(query) => {
2024 let (expr, scope) = plan_nested_query(qcx, query)?;
2025 Ok((expr, scope))
2026 }
2027 SetExpr::Show(stmt) => {
2028 if !qcx.lifetime.allow_show() {
2042 return Err(PlanError::ShowCommandInView);
2043 }
2044
2045 fn to_hirscope(
2048 plan: ShowCreatePlan,
2049 desc: StatementDesc,
2050 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2051 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2052 let desc = desc.relation_desc.expect("must exist");
2053 let scope = Scope::from_source(None, desc.iter_names());
2054 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2055 Ok((expr, scope))
2056 }
2057
2058 match stmt.clone() {
2059 ShowStatement::ShowColumns(stmt) => {
2060 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2061 }
2062 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2063 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2064 show::describe_show_create_connection(qcx.scx, stmt)?,
2065 ),
2066 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2067 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2068 show::describe_show_create_cluster(qcx.scx, stmt)?,
2069 ),
2070 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2071 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2072 show::describe_show_create_index(qcx.scx, stmt)?,
2073 ),
2074 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2075 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2076 show::describe_show_create_sink(qcx.scx, stmt)?,
2077 ),
2078 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2079 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2080 show::describe_show_create_source(qcx.scx, stmt)?,
2081 ),
2082 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2083 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2084 show::describe_show_create_table(qcx.scx, stmt)?,
2085 ),
2086 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2087 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2088 show::describe_show_create_view(qcx.scx, stmt)?,
2089 ),
2090 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2091 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2092 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2093 ),
2094 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2095 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2096 show::describe_show_create_type(qcx.scx, stmt)?,
2097 ),
2098 ShowStatement::ShowObjects(stmt) => {
2099 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2100 }
2101 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2102 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2103 }
2104 }
2105 }
2106}
2107
2108fn plan_values(
2110 qcx: &QueryContext,
2111 values: &[Vec<Expr<Aug>>],
2112) -> Result<(HirRelationExpr, Scope), PlanError> {
2113 assert!(!values.is_empty());
2114
2115 let ecx = &ExprContext {
2116 qcx,
2117 name: "VALUES",
2118 scope: &Scope::empty(),
2119 relation_type: &SqlRelationType::empty(),
2120 allow_aggregates: false,
2121 allow_subqueries: true,
2122 allow_parameters: true,
2123 allow_windows: false,
2124 };
2125
2126 let ncols = values[0].len();
2127 let nrows = values.len();
2128
2129 let mut cols = vec![vec![]; ncols];
2132 for row in values {
2133 if row.len() != ncols {
2134 sql_bail!(
2135 "VALUES expression has varying number of columns: {} vs {}",
2136 row.len(),
2137 ncols
2138 );
2139 }
2140 for (i, v) in row.iter().enumerate() {
2141 cols[i].push(v);
2142 }
2143 }
2144
2145 let mut col_iters = Vec::with_capacity(ncols);
2147 let mut col_types = Vec::with_capacity(ncols);
2148 for col in &cols {
2149 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2150 let mut col_type = ecx.column_type(&col[0]);
2151 for val in &col[1..] {
2152 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2154 col_types.push(col_type);
2155 col_iters.push(col.into_iter());
2156 }
2157
2158 let mut exprs = vec![];
2160 for _ in 0..nrows {
2161 for i in 0..ncols {
2162 exprs.push(col_iters[i].next().unwrap());
2163 }
2164 }
2165 let out = HirRelationExpr::CallTable {
2166 func: TableFunc::Wrap {
2167 width: ncols,
2168 types: col_types,
2169 },
2170 exprs,
2171 };
2172
2173 let mut scope = Scope::empty();
2175 for i in 0..ncols {
2176 let name = format!("column{}", i + 1);
2177 scope.items.push(ScopeItem::from_column_name(name));
2178 }
2179
2180 Ok((out, scope))
2181}
2182
2183fn plan_values_insert(
2193 qcx: &QueryContext,
2194 target_names: &[&ColumnName],
2195 target_types: &[&SqlScalarType],
2196 values: &[Vec<Expr<Aug>>],
2197) -> Result<HirRelationExpr, PlanError> {
2198 assert!(!values.is_empty());
2199
2200 if !values.iter().map(|row| row.len()).all_equal() {
2201 sql_bail!("VALUES lists must all be the same length");
2202 }
2203
2204 let ecx = &ExprContext {
2205 qcx,
2206 name: "VALUES",
2207 scope: &Scope::empty(),
2208 relation_type: &SqlRelationType::empty(),
2209 allow_aggregates: false,
2210 allow_subqueries: true,
2211 allow_parameters: true,
2212 allow_windows: false,
2213 };
2214
2215 let mut exprs = vec![];
2216 let mut types = vec![];
2217 for row in values {
2218 if row.len() > target_names.len() {
2219 sql_bail!("INSERT has more expressions than target columns");
2220 }
2221 for (column, val) in row.into_iter().enumerate() {
2222 let target_type = &target_types[column];
2223 let val = plan_expr(ecx, val)?;
2224 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2225 let source_type = &ecx.scalar_type(&val);
2226 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2227 Ok(val) => val,
2228 Err(_) => sql_bail!(
2229 "column {} is of type {} but expression is of type {}",
2230 target_names[column].quoted(),
2231 qcx.humanize_scalar_type(target_type, false),
2232 qcx.humanize_scalar_type(source_type, false),
2233 ),
2234 };
2235 if column >= types.len() {
2236 types.push(ecx.column_type(&val));
2237 } else {
2238 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2240 exprs.push(val);
2241 }
2242 }
2243
2244 Ok(HirRelationExpr::CallTable {
2245 func: TableFunc::Wrap {
2246 width: values[0].len(),
2247 types,
2248 },
2249 exprs,
2250 })
2251}
2252
2253fn plan_join_identity() -> (HirRelationExpr, Scope) {
2254 let typ = SqlRelationType::new(vec![]);
2255 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2256 let scope = Scope::empty();
2257 (expr, scope)
2258}
2259
2260#[derive(Debug)]
2266struct SelectPlan {
2267 expr: HirRelationExpr,
2268 scope: Scope,
2269 order_by: Vec<ColumnOrder>,
2270 project: Vec<usize>,
2271}
2272
2273generate_extracted_config!(
2274 SelectOption,
2275 (ExpectedGroupSize, u64),
2276 (AggregateInputGroupSize, u64),
2277 (DistinctOnInputGroupSize, u64),
2278 (LimitInputGroupSize, u64)
2279);
2280
2281fn plan_select_from_where(
2299 qcx: &QueryContext,
2300 mut s: Select<Aug>,
2301 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2302) -> Result<SelectPlan, PlanError> {
2303 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2310 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2311
2312 let (mut relation_expr, mut from_scope) =
2314 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2315 let (left, left_scope) = l;
2316 plan_join(
2317 qcx,
2318 left,
2319 left_scope,
2320 &Join {
2321 relation: TableFactor::NestedJoin {
2322 join: Box::new(twj.clone()),
2323 alias: None,
2324 },
2325 join_operator: JoinOperator::CrossJoin,
2326 },
2327 )
2328 })?;
2329
2330 if let Some(selection) = &s.selection {
2332 let ecx = &ExprContext {
2333 qcx,
2334 name: "WHERE clause",
2335 scope: &from_scope,
2336 relation_type: &qcx.relation_type(&relation_expr),
2337 allow_aggregates: false,
2338 allow_subqueries: true,
2339 allow_parameters: true,
2340 allow_windows: false,
2341 };
2342 let expr = plan_expr(ecx, selection)
2343 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2344 .type_as(ecx, &SqlScalarType::Bool)?;
2345 relation_expr = relation_expr.filter(vec![expr]);
2346 }
2347
2348 let (aggregates, table_funcs) = {
2351 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2352 visitor.visit_select_mut(&mut s);
2353 for o in order_by_exprs.iter_mut() {
2354 visitor.visit_order_by_expr_mut(o);
2355 }
2356 visitor.into_result()?
2357 };
2358 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2359 if !table_funcs.is_empty() {
2360 let (expr, scope) = plan_scalar_table_funcs(
2361 qcx,
2362 table_funcs,
2363 &mut table_func_names,
2364 &relation_expr,
2365 &from_scope,
2366 )?;
2367 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2368 from_scope = from_scope.product(scope)?;
2369 }
2370
2371 let projection = {
2373 let ecx = &ExprContext {
2374 qcx,
2375 name: "SELECT clause",
2376 scope: &from_scope,
2377 relation_type: &qcx.relation_type(&relation_expr),
2378 allow_aggregates: true,
2379 allow_subqueries: true,
2380 allow_parameters: true,
2381 allow_windows: true,
2382 };
2383 let mut out = vec![];
2384 for si in &s.projection {
2385 if *si == SelectItem::Wildcard && s.from.is_empty() {
2386 sql_bail!("SELECT * with no tables specified is not valid");
2387 }
2388 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2389 }
2390 out
2391 };
2392
2393 let (mut group_scope, select_all_mapping) = {
2397 let ecx = &ExprContext {
2399 qcx,
2400 name: "GROUP BY clause",
2401 scope: &from_scope,
2402 relation_type: &qcx.relation_type(&relation_expr),
2403 allow_aggregates: false,
2404 allow_subqueries: true,
2405 allow_parameters: true,
2406 allow_windows: false,
2407 };
2408 let mut group_key = vec![];
2409 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2410 let mut group_hir_exprs = vec![];
2411 let mut group_scope = Scope::empty();
2412 let mut select_all_mapping = BTreeMap::new();
2413
2414 for group_expr in &s.group_by {
2415 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2416 let new_column = group_key.len();
2417
2418 if let Some(group_expr) = group_expr {
2419 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2423 existing_scope_item.exprs.insert(group_expr.clone());
2424 continue;
2425 }
2426 }
2427
2428 let mut scope_item = if let HirScalarExpr::Column(
2429 ColumnRef {
2430 level: 0,
2431 column: old_column,
2432 },
2433 _name,
2434 ) = &expr
2435 {
2436 select_all_mapping.insert(*old_column, new_column);
2442 let scope_item = ecx.scope.items[*old_column].clone();
2443 scope_item
2444 } else {
2445 ScopeItem::empty()
2446 };
2447
2448 if let Some(group_expr) = group_expr.cloned() {
2449 scope_item.exprs.insert(group_expr);
2450 }
2451
2452 group_key.push(from_scope.len() + group_exprs.len());
2453 group_hir_exprs.push(expr.clone());
2454 group_exprs.insert(expr, scope_item);
2455 }
2456
2457 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2458 for expr in &group_hir_exprs {
2459 if let Some(scope_item) = group_exprs.remove(expr) {
2460 group_scope.items.push(scope_item);
2461 }
2462 }
2463
2464 let ecx = &ExprContext {
2466 qcx,
2467 name: "aggregate function",
2468 scope: &from_scope,
2469 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2470 allow_aggregates: false,
2471 allow_subqueries: true,
2472 allow_parameters: true,
2473 allow_windows: false,
2474 };
2475 let mut agg_exprs = vec![];
2476 for sql_function in aggregates {
2477 if sql_function.over.is_some() {
2478 unreachable!(
2479 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2480 );
2481 }
2482 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2483 group_scope
2484 .items
2485 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2486 }
2487 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2488 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2490 group_key,
2491 agg_exprs,
2492 group_size_hints.aggregate_input_group_size,
2493 );
2494
2495 for i in 0..from_scope.len() {
2501 if !select_all_mapping.contains_key(&i) {
2502 let scope_item = &ecx.scope.items[i];
2503 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2504 table_name: scope_item.table_name.clone(),
2505 column_name: scope_item.column_name.clone(),
2506 allow_unqualified_references: scope_item.allow_unqualified_references,
2507 });
2508 }
2509 }
2510
2511 (group_scope, select_all_mapping)
2512 } else {
2513 (
2515 from_scope.clone(),
2516 (0..from_scope.len()).map(|i| (i, i)).collect(),
2517 )
2518 }
2519 };
2520
2521 if let Some(ref having) = s.having {
2523 let ecx = &ExprContext {
2524 qcx,
2525 name: "HAVING clause",
2526 scope: &group_scope,
2527 relation_type: &qcx.relation_type(&relation_expr),
2528 allow_aggregates: true,
2529 allow_subqueries: true,
2530 allow_parameters: true,
2531 allow_windows: false,
2532 };
2533 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2534 relation_expr = relation_expr.filter(vec![expr]);
2535 }
2536
2537 let window_funcs = {
2550 let mut visitor = WindowFuncCollector::default();
2551 visitor.visit_select(&s);
2555 for o in order_by_exprs.iter() {
2556 visitor.visit_order_by_expr(o);
2557 }
2558 visitor.into_result()
2559 };
2560 for window_func in window_funcs {
2561 let ecx = &ExprContext {
2562 qcx,
2563 name: "window function",
2564 scope: &group_scope,
2565 relation_type: &qcx.relation_type(&relation_expr),
2566 allow_aggregates: true,
2567 allow_subqueries: true,
2568 allow_parameters: true,
2569 allow_windows: true,
2570 };
2571 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2572 group_scope.items.push(ScopeItem::from_expr(window_func));
2573 }
2574 if let Some(ref qualify) = s.qualify {
2581 let ecx = &ExprContext {
2582 qcx,
2583 name: "QUALIFY clause",
2584 scope: &group_scope,
2585 relation_type: &qcx.relation_type(&relation_expr),
2586 allow_aggregates: true,
2587 allow_subqueries: true,
2588 allow_parameters: true,
2589 allow_windows: true,
2590 };
2591 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2592 relation_expr = relation_expr.filter(vec![expr]);
2593 }
2594
2595 let output_columns = {
2597 let mut new_exprs = vec![];
2598 let mut new_type = qcx.relation_type(&relation_expr);
2599 let mut output_columns = vec![];
2600 for (select_item, column_name) in &projection {
2601 let ecx = &ExprContext {
2602 qcx,
2603 name: "SELECT clause",
2604 scope: &group_scope,
2605 relation_type: &new_type,
2606 allow_aggregates: true,
2607 allow_subqueries: true,
2608 allow_parameters: true,
2609 allow_windows: true,
2610 };
2611 let expr = match select_item {
2612 ExpandedSelectItem::InputOrdinal(i) => {
2613 if let Some(column) = select_all_mapping.get(i).copied() {
2614 HirScalarExpr::column(column)
2615 } else {
2616 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2617 }
2618 }
2619 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2620 };
2621 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2622 output_columns.push((column, column_name));
2624 } else {
2625 let typ = ecx.column_type(&expr);
2632 new_type.column_types.push(typ);
2633 new_exprs.push(expr);
2634 output_columns.push((group_scope.len(), column_name));
2635 group_scope
2636 .items
2637 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2638 }
2639 }
2640 relation_expr = relation_expr.map(new_exprs);
2641 output_columns
2642 };
2643 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2644
2645 let order_by = {
2647 let relation_type = qcx.relation_type(&relation_expr);
2648 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2649 &ExprContext {
2650 qcx,
2651 name: "ORDER BY clause",
2652 scope: &group_scope,
2653 relation_type: &relation_type,
2654 allow_aggregates: true,
2655 allow_subqueries: true,
2656 allow_parameters: true,
2657 allow_windows: true,
2658 },
2659 &order_by_exprs,
2660 &output_columns,
2661 )?;
2662
2663 match s.distinct {
2664 None => relation_expr = relation_expr.map(map_exprs),
2665 Some(Distinct::EntireRow) => {
2666 if relation_type.arity() == 0 {
2667 sql_bail!("SELECT DISTINCT must have at least one column");
2668 }
2669 if !try_push_projection_order_by(
2673 &mut relation_expr,
2674 &mut project_key,
2675 &mut order_by,
2676 ) {
2677 sql_bail!(
2678 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2679 );
2680 }
2681 assert!(map_exprs.is_empty());
2682 relation_expr = relation_expr.distinct();
2683 }
2684 Some(Distinct::On(exprs)) => {
2685 let ecx = &ExprContext {
2686 qcx,
2687 name: "DISTINCT ON clause",
2688 scope: &group_scope,
2689 relation_type: &qcx.relation_type(&relation_expr),
2690 allow_aggregates: true,
2691 allow_subqueries: true,
2692 allow_parameters: true,
2693 allow_windows: true,
2694 };
2695
2696 let mut distinct_exprs = vec![];
2697 for expr in &exprs {
2698 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2699 distinct_exprs.push(expr);
2700 }
2701
2702 let mut distinct_key = vec![];
2703
2704 let arity = relation_type.arity();
2714 for ord in order_by.iter().take(distinct_exprs.len()) {
2715 let mut expr = &HirScalarExpr::column(ord.column);
2718 if ord.column >= arity {
2719 expr = &map_exprs[ord.column - arity];
2720 };
2721 match distinct_exprs.iter().position(move |e| e == expr) {
2722 None => sql_bail!(
2723 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2724 ),
2725 Some(pos) => {
2726 distinct_exprs.remove(pos);
2727 }
2728 }
2729 distinct_key.push(ord.column);
2730 }
2731
2732 for expr in distinct_exprs {
2734 let column = match expr {
2737 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2738 _ => {
2739 map_exprs.push(expr);
2740 arity + map_exprs.len() - 1
2741 }
2742 };
2743 distinct_key.push(column);
2744 }
2745
2746 let distinct_len = distinct_key.len();
2751 relation_expr = HirRelationExpr::top_k(
2752 relation_expr.map(map_exprs),
2753 distinct_key,
2754 order_by.iter().skip(distinct_len).cloned().collect(),
2755 Some(HirScalarExpr::literal(
2756 Datum::Int64(1),
2757 SqlScalarType::Int64,
2758 )),
2759 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2760 group_size_hints.distinct_on_input_group_size,
2761 );
2762 }
2763 }
2764
2765 order_by
2766 };
2767
2768 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2773
2774 Ok(SelectPlan {
2775 expr: relation_expr,
2776 scope,
2777 order_by,
2778 project: project_key,
2779 })
2780}
2781
2782fn plan_scalar_table_funcs(
2783 qcx: &QueryContext,
2784 table_funcs: BTreeMap<Function<Aug>, String>,
2785 table_func_names: &mut BTreeMap<String, Ident>,
2786 relation_expr: &HirRelationExpr,
2787 from_scope: &Scope,
2788) -> Result<(HirRelationExpr, Scope), PlanError> {
2789 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2790
2791 for (table_func, id) in table_funcs.iter() {
2792 table_func_names.insert(
2793 id.clone(),
2794 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2796 );
2797 }
2798 if table_funcs.len() == 1 {
2801 let (table_func, id) = table_funcs.iter().next().unwrap();
2802 let (expr, mut scope) =
2803 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2804
2805 let num_cols = scope.len();
2807 for i in 0..scope.len() {
2808 scope.items[i].table_name = Some(PartialItemName {
2809 database: None,
2810 schema: None,
2811 item: id.clone(),
2812 });
2813 scope.items[i].from_single_column_function = num_cols == 1;
2814 scope.items[i].allow_unqualified_references = false;
2815 }
2816 return Ok((expr, scope));
2817 }
2818 let (expr, mut scope, num_cols) =
2820 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2821
2822 let mut i = 0;
2824 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2825 for _ in 0..num_cols {
2826 scope.items[i].table_name = Some(PartialItemName {
2827 database: None,
2828 schema: None,
2829 item: id.clone(),
2830 });
2831 scope.items[i].from_single_column_function = num_cols == 1;
2832 scope.items[i].allow_unqualified_references = false;
2833 i += 1;
2834 }
2835 scope.items[i].table_name = Some(PartialItemName {
2839 database: None,
2840 schema: None,
2841 item: id.clone(),
2842 });
2843 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2844 scope.items[i].allow_unqualified_references = false;
2845 i += 1;
2846 }
2847 scope.items[i].allow_unqualified_references = false;
2849 Ok((expr, scope))
2850}
2851
2852fn plan_group_by_expr<'a>(
2859 ecx: &ExprContext,
2860 group_expr: &'a Expr<Aug>,
2861 projection: &'a [(ExpandedSelectItem, ColumnName)],
2862) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2863 let plan_projection = |column: usize| match &projection[column].0 {
2864 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2865 ExpandedSelectItem::Expr(expr) => {
2866 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2867 }
2868 };
2869
2870 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2873 return plan_projection(column);
2874 }
2875
2876 match group_expr {
2880 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2881 Err(PlanError::UnknownColumn {
2882 table: None,
2883 column,
2884 similar,
2885 }) => {
2886 let mut iter = projection.iter().map(|(_expr, name)| name);
2889 if let Some(i) = iter.position(|n| *n == column) {
2890 if iter.any(|n| *n == column) {
2891 Err(PlanError::AmbiguousColumn(column))
2892 } else {
2893 plan_projection(i)
2894 }
2895 } else {
2896 Err(PlanError::UnknownColumn {
2899 table: None,
2900 column,
2901 similar,
2902 })
2903 }
2904 }
2905 res => Ok((Some(group_expr), res?)),
2906 },
2907 _ => Ok((
2908 Some(group_expr),
2909 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2910 )),
2911 }
2912}
2913
2914pub(crate) fn plan_order_by_exprs(
2922 ecx: &ExprContext,
2923 order_by_exprs: &[OrderByExpr<Aug>],
2924 output_columns: &[(usize, &ColumnName)],
2925) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2926 let mut order_by = vec![];
2927 let mut map_exprs = vec![];
2928 for obe in order_by_exprs {
2929 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2930 let column = match expr {
2933 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2934 _ => {
2935 map_exprs.push(expr);
2936 ecx.relation_type.arity() + map_exprs.len() - 1
2937 }
2938 };
2939 order_by.push(resolve_desc_and_nulls_last(obe, column));
2940 }
2941 Ok((order_by, map_exprs))
2942}
2943
2944fn plan_order_by_or_distinct_expr(
2962 ecx: &ExprContext,
2963 expr: &Expr<Aug>,
2964 output_columns: &[(usize, &ColumnName)],
2965) -> Result<HirScalarExpr, PlanError> {
2966 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2967 return Ok(HirScalarExpr::column(output_columns[i].0));
2968 }
2969
2970 if let Expr::Identifier(names) = expr {
2971 if let [name] = &names[..] {
2972 let name = normalize::column_name(name.clone());
2973 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2974 if let Some((i, _)) = iter.next() {
2975 match iter.next() {
2976 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2980 _ => return Ok(HirScalarExpr::column(*i)),
2981 }
2982 }
2983 }
2984 }
2985
2986 plan_expr(ecx, expr)?.type_as_any(ecx)
2987}
2988
2989fn plan_table_with_joins(
2990 qcx: &QueryContext,
2991 table_with_joins: &TableWithJoins<Aug>,
2992) -> Result<(HirRelationExpr, Scope), PlanError> {
2993 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2994 for join in &table_with_joins.joins {
2995 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2996 expr = new_expr;
2997 scope = new_scope;
2998 }
2999 Ok((expr, scope))
3000}
3001
3002fn plan_table_factor(
3003 qcx: &QueryContext,
3004 table_factor: &TableFactor<Aug>,
3005) -> Result<(HirRelationExpr, Scope), PlanError> {
3006 match table_factor {
3007 TableFactor::Table { name, alias } => {
3008 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3009 let scope = plan_table_alias(scope, alias.as_ref())?;
3010 Ok((expr, scope))
3011 }
3012
3013 TableFactor::Function {
3014 function,
3015 alias,
3016 with_ordinality,
3017 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3018
3019 TableFactor::RowsFrom {
3020 functions,
3021 alias,
3022 with_ordinality,
3023 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3024
3025 TableFactor::Derived {
3026 lateral,
3027 subquery,
3028 alias,
3029 } => {
3030 let mut qcx = (*qcx).clone();
3031 if !lateral {
3032 for scope in &mut qcx.outer_scopes {
3036 if scope.lateral_barrier {
3037 break;
3038 }
3039 scope.items.clear();
3040 }
3041 }
3042 qcx.outer_scopes[0].lateral_barrier = true;
3043 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3044 let scope = plan_table_alias(scope, alias.as_ref())?;
3045 Ok((expr, scope))
3046 }
3047
3048 TableFactor::NestedJoin { join, alias } => {
3049 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3050 let scope = plan_table_alias(scope, alias.as_ref())?;
3051 Ok((expr, scope))
3052 }
3053 }
3054}
3055
3056fn plan_rows_from(
3098 qcx: &QueryContext,
3099 functions: &[Function<Aug>],
3100 alias: Option<&TableAlias>,
3101 with_ordinality: bool,
3102) -> Result<(HirRelationExpr, Scope), PlanError> {
3103 if let [function] = functions {
3106 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3107 }
3108
3109 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3113 qcx,
3114 functions,
3115 Some(functions[0].name.full_item_name().clone()),
3116 )?;
3117
3118 let mut columns = Vec::new();
3120 let mut offset = 0;
3121 for (idx, cols) in num_cols.into_iter().enumerate() {
3123 for i in 0..cols {
3124 columns.push(offset + i);
3125 }
3126 offset += cols + 1;
3127
3128 scope.items.remove(offset - idx - 1);
3131 }
3132
3133 if with_ordinality {
3136 columns.push(offset);
3137 } else {
3138 scope.items.pop();
3139 }
3140
3141 let expr = expr.project(columns);
3142
3143 let scope = plan_table_alias(scope, alias)?;
3144 Ok((expr, scope))
3145}
3146
3147fn plan_rows_from_internal<'a>(
3170 qcx: &QueryContext,
3171 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3172 table_name: Option<FullItemName>,
3173) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3174 let mut functions = functions.into_iter();
3175 let mut num_cols = Vec::new();
3176
3177 let (mut left_expr, mut left_scope) =
3181 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3182 num_cols.push(left_scope.len() - 1);
3183 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3185 left_scope
3186 .items
3187 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3188
3189 for function in functions {
3190 let qcx = qcx.empty_derived_context();
3192 let (right_expr, mut right_scope) =
3193 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3194 num_cols.push(right_scope.len() - 1);
3195 let left_col = left_scope.len() - 1;
3196 let right_col = left_scope.len() + right_scope.len() - 1;
3197 let on = HirScalarExpr::call_binary(
3198 HirScalarExpr::column(left_col),
3199 HirScalarExpr::column(right_col),
3200 expr_func::Eq,
3201 );
3202 left_expr = left_expr
3203 .join(right_expr, on, JoinKind::FullOuter)
3204 .map(vec![HirScalarExpr::call_variadic(
3205 VariadicFunc::Coalesce,
3206 vec![
3207 HirScalarExpr::column(left_col),
3208 HirScalarExpr::column(right_col),
3209 ],
3210 )]);
3211
3212 left_expr = left_expr.project(
3215 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3218 );
3219 right_scope.items.push(left_scope.items.pop().unwrap());
3221
3222 left_scope.items.extend(right_scope.items);
3223 }
3224
3225 Ok((left_expr, left_scope, num_cols))
3226}
3227
3228fn plan_solitary_table_function(
3232 qcx: &QueryContext,
3233 function: &Function<Aug>,
3234 alias: Option<&TableAlias>,
3235 with_ordinality: bool,
3236) -> Result<(HirRelationExpr, Scope), PlanError> {
3237 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3238
3239 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3240 if single_column_function {
3241 let item = &mut scope.items[0];
3242
3243 item.from_single_column_function = true;
3246
3247 if let Some(alias) = alias {
3262 if let ScopeItem {
3263 table_name: Some(table_name),
3264 column_name,
3265 ..
3266 } = item
3267 {
3268 if table_name.item.as_str() == column_name.as_str() {
3269 *column_name = normalize::column_name(alias.name.clone());
3270 }
3271 }
3272 }
3273 }
3274
3275 let scope = plan_table_alias(scope, alias)?;
3276 Ok((expr, scope))
3277}
3278
3279fn plan_table_function_internal(
3284 qcx: &QueryContext,
3285 Function {
3286 name,
3287 args,
3288 filter,
3289 over,
3290 distinct,
3291 }: &Function<Aug>,
3292 with_ordinality: bool,
3293 table_name: Option<FullItemName>,
3294) -> Result<(HirRelationExpr, Scope), PlanError> {
3295 assert_none!(filter, "cannot parse table function with FILTER");
3296 assert_none!(over, "cannot parse table function with OVER");
3297 assert!(!*distinct, "cannot parse table function with DISTINCT");
3298
3299 let ecx = &ExprContext {
3300 qcx,
3301 name: "table function arguments",
3302 scope: &Scope::empty(),
3303 relation_type: &SqlRelationType::empty(),
3304 allow_aggregates: false,
3305 allow_subqueries: true,
3306 allow_parameters: true,
3307 allow_windows: false,
3308 };
3309
3310 let scalar_args = match args {
3311 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3312 FunctionArgs::Args { args, order_by } => {
3313 if !order_by.is_empty() {
3314 sql_bail!(
3315 "ORDER BY specified, but {} is not an aggregate function",
3316 name
3317 );
3318 }
3319 plan_exprs(ecx, args)?
3320 }
3321 };
3322
3323 let table_name = match table_name {
3324 Some(table_name) => table_name.item,
3325 None => name.full_item_name().item.clone(),
3326 };
3327
3328 let scope_name = Some(PartialItemName {
3329 database: None,
3330 schema: None,
3331 item: table_name,
3332 });
3333
3334 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3335 Func::Table(impls) => {
3336 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3337 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3338 let expr = match tf.imp {
3339 TableFuncImpl::CallTable { mut func, exprs } => {
3340 if with_ordinality {
3341 func = TableFunc::with_ordinality(func.clone()).ok_or(
3342 PlanError::Unsupported {
3343 feature: format!("WITH ORDINALITY on {}", func),
3344 discussion_no: None,
3345 },
3346 )?;
3347 }
3348 HirRelationExpr::CallTable { func, exprs }
3349 }
3350 TableFuncImpl::Expr(expr) => {
3351 if !with_ordinality {
3352 expr
3353 } else {
3354 if qcx
3358 .scx
3359 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3360 {
3361 tracing::error!(
3365 %name,
3366 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3367 );
3368 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3369 func: WindowExprType::Scalar(ScalarWindowExpr {
3370 func: ScalarWindowFunc::RowNumber,
3371 order_by: vec![],
3372 }),
3373 partition_by: vec![],
3374 order_by: vec![],
3375 })])
3376 } else {
3377 bail_unsupported!(format!(
3378 "WITH ORDINALITY or ROWS FROM with {}",
3379 name
3380 ));
3381 }
3382 }
3383 }
3384 };
3385 (expr, scope)
3386 }
3387 Func::Scalar(impls) => {
3388 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3389 let output = expr.typ(
3390 &qcx.outer_relation_types,
3391 &SqlRelationType::new(vec![]),
3392 &qcx.scx.param_types.borrow(),
3393 );
3394
3395 let relation = SqlRelationType::new(vec![output]);
3396
3397 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3398 let column_name = normalize::column_name(function_ident);
3399 let name = column_name.to_string();
3400
3401 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3402
3403 let mut func = TableFunc::TabletizedScalar { relation, name };
3404 if with_ordinality {
3405 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3406 feature: format!("WITH ORDINALITY on {}", func),
3407 discussion_no: None,
3408 })?;
3409 }
3410 (
3411 HirRelationExpr::CallTable {
3412 func,
3413 exprs: vec![expr],
3414 },
3415 scope,
3416 )
3417 }
3418 o => sql_bail!(
3419 "{} functions are not supported in functions in FROM",
3420 o.class()
3421 ),
3422 };
3423
3424 if with_ordinality {
3425 scope
3426 .items
3427 .push(ScopeItem::from_name(scope_name, "ordinality"));
3428 }
3429
3430 Ok((expr, scope))
3431}
3432
3433fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3434 if let Some(TableAlias {
3435 name,
3436 columns,
3437 strict,
3438 }) = alias
3439 {
3440 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3441 sql_bail!(
3442 "{} has {} columns available but {} columns specified",
3443 name,
3444 scope.items.len(),
3445 columns.len()
3446 );
3447 }
3448
3449 let table_name = normalize::ident(name.to_owned());
3450 for (i, item) in scope.items.iter_mut().enumerate() {
3451 item.table_name = if item.allow_unqualified_references {
3452 Some(PartialItemName {
3453 database: None,
3454 schema: None,
3455 item: table_name.clone(),
3456 })
3457 } else {
3458 None
3492 };
3493 item.column_name = columns
3494 .get(i)
3495 .map(|a| normalize::column_name(a.clone()))
3496 .unwrap_or_else(|| item.column_name.clone());
3497 }
3498 }
3499 Ok(scope)
3500}
3501
3502fn invent_column_name(
3506 ecx: &ExprContext,
3507 expr: &Expr<Aug>,
3508 table_func_names: &BTreeMap<String, Ident>,
3509) -> Option<ColumnName> {
3510 #[derive(Debug)]
3517 enum NameQuality {
3518 Low,
3519 High,
3520 }
3521
3522 fn invent(
3523 ecx: &ExprContext,
3524 expr: &Expr<Aug>,
3525 table_func_names: &BTreeMap<String, Ident>,
3526 ) -> Option<(ColumnName, NameQuality)> {
3527 match expr {
3528 Expr::Identifier(names) => {
3529 if let [name] = names.as_slice() {
3530 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3531 return Some((
3532 normalize::column_name(table_func_name.clone()),
3533 NameQuality::High,
3534 ));
3535 }
3536 }
3537 names
3538 .last()
3539 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3540 }
3541 Expr::Value(v) => match v {
3542 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3545 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3546 _ => None,
3547 },
3548 Expr::Function(func) => {
3549 let (schema, item) = match &func.name {
3550 ResolvedItemName::Item {
3551 qualifiers,
3552 full_name,
3553 ..
3554 } => (&qualifiers.schema_spec, full_name.item.clone()),
3555 _ => unreachable!(),
3556 };
3557
3558 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3559 || schema
3560 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3561 {
3562 None
3563 } else {
3564 Some((item.into(), NameQuality::High))
3565 }
3566 }
3567 Expr::HomogenizingFunction { function, .. } => Some((
3568 function.to_string().to_lowercase().into(),
3569 NameQuality::High,
3570 )),
3571 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3572 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3573 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3574 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3575 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3576 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3577 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3578 },
3579 Expr::Case { else_result, .. } => {
3580 match else_result
3581 .as_ref()
3582 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3583 {
3584 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3585 _ => Some(("case".into(), NameQuality::Low)),
3586 }
3587 }
3588 Expr::FieldAccess { field, .. } => {
3589 Some((normalize::column_name(field.clone()), NameQuality::High))
3590 }
3591 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3592 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3593 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3594 let (_expr, scope) =
3598 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3599 scope
3600 .items
3601 .first()
3602 .map(|name| (name.column_name.clone(), NameQuality::High))
3603 }
3604 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3605 _ => None,
3606 }
3607 }
3608
3609 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3610}
3611
3612#[derive(Debug)]
3613enum ExpandedSelectItem<'a> {
3614 InputOrdinal(usize),
3615 Expr(Cow<'a, Expr<Aug>>),
3616}
3617
3618impl ExpandedSelectItem<'_> {
3619 fn as_expr(&self) -> Option<&Expr<Aug>> {
3620 match self {
3621 ExpandedSelectItem::InputOrdinal(_) => None,
3622 ExpandedSelectItem::Expr(expr) => Some(expr),
3623 }
3624 }
3625}
3626
3627fn expand_select_item<'a>(
3628 ecx: &ExprContext,
3629 s: &'a SelectItem<Aug>,
3630 table_func_names: &BTreeMap<String, Ident>,
3631) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3632 match s {
3633 SelectItem::Expr {
3634 expr: Expr::QualifiedWildcard(table_name),
3635 alias: _,
3636 } => {
3637 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3638 let table_name =
3639 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3640 let out: Vec<_> = ecx
3641 .scope
3642 .items
3643 .iter()
3644 .enumerate()
3645 .filter(|(_i, item)| item.is_from_table(&table_name))
3646 .map(|(i, item)| {
3647 let name = item.column_name.clone();
3648 (ExpandedSelectItem::InputOrdinal(i), name)
3649 })
3650 .collect();
3651 if out.is_empty() {
3652 sql_bail!("no table named '{}' in scope", table_name);
3653 }
3654 Ok(out)
3655 }
3656 SelectItem::Expr {
3657 expr: Expr::WildcardAccess(sql_expr),
3658 alias: _,
3659 } => {
3660 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3661 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3667 let fields = match ecx.scalar_type(&expr) {
3668 SqlScalarType::Record { fields, .. } => fields,
3669 ty => sql_bail!(
3670 "type {} is not composite",
3671 ecx.humanize_scalar_type(&ty, false)
3672 ),
3673 };
3674 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3675 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3676 if let [name] = ident.as_slice() {
3677 if let Ok(items) = ecx.scope.items_from_table(
3678 &[],
3679 &PartialItemName {
3680 database: None,
3681 schema: None,
3682 item: name.as_str().to_string(),
3683 },
3684 ) {
3685 for (_, item) in items {
3686 if item
3687 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3688 {
3689 skip_cols.insert(item.column_name.clone());
3690 }
3691 }
3692 }
3693 }
3694 }
3695 let items = fields
3696 .iter()
3697 .filter_map(|(name, _ty)| {
3698 if skip_cols.contains(name) {
3699 None
3700 } else {
3701 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3702 expr: sql_expr.clone(),
3703 field: name.clone().into(),
3704 }));
3705 Some((item, name.clone()))
3706 }
3707 })
3708 .collect();
3709 Ok(items)
3710 }
3711 SelectItem::Wildcard => {
3712 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3713 let items: Vec<_> = ecx
3714 .scope
3715 .items
3716 .iter()
3717 .enumerate()
3718 .filter(|(_i, item)| item.allow_unqualified_references)
3719 .map(|(i, item)| {
3720 let name = item.column_name.clone();
3721 (ExpandedSelectItem::InputOrdinal(i), name)
3722 })
3723 .collect();
3724
3725 Ok(items)
3726 }
3727 SelectItem::Expr { expr, alias } => {
3728 let name = alias
3729 .clone()
3730 .map(normalize::column_name)
3731 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3732 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3733 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3734 }
3735 }
3736}
3737
3738fn plan_join(
3739 left_qcx: &QueryContext,
3740 left: HirRelationExpr,
3741 left_scope: Scope,
3742 join: &Join<Aug>,
3743) -> Result<(HirRelationExpr, Scope), PlanError> {
3744 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3745 let (kind, constraint) = match &join.join_operator {
3746 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3747 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3748 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3749 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3750 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3751 };
3752
3753 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3754 if !kind.can_be_correlated() {
3755 for item in &mut right_qcx.outer_scopes[0].items {
3756 item.error_if_referenced =
3761 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3762 table: table.cloned(),
3763 column: column.clone(),
3764 });
3765 }
3766 }
3767 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3768
3769 let (expr, scope) = match constraint {
3770 JoinConstraint::On(expr) => {
3771 let product_scope = left_scope.product(right_scope)?;
3772 let ecx = &ExprContext {
3773 qcx: left_qcx,
3774 name: "ON clause",
3775 scope: &product_scope,
3776 relation_type: &SqlRelationType::new(
3777 left_qcx
3778 .relation_type(&left)
3779 .column_types
3780 .into_iter()
3781 .chain(right_qcx.relation_type(&right).column_types)
3782 .collect(),
3783 ),
3784 allow_aggregates: false,
3785 allow_subqueries: true,
3786 allow_parameters: true,
3787 allow_windows: false,
3788 };
3789 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3790 let joined = left.join(right, on, kind);
3791 (joined, product_scope)
3792 }
3793 JoinConstraint::Using { columns, alias } => {
3794 let column_names = columns
3795 .iter()
3796 .map(|ident| normalize::column_name(ident.clone()))
3797 .collect::<Vec<_>>();
3798
3799 plan_using_constraint(
3800 &column_names,
3801 left_qcx,
3802 left,
3803 left_scope,
3804 &right_qcx,
3805 right,
3806 right_scope,
3807 kind,
3808 alias.as_ref(),
3809 )?
3810 }
3811 JoinConstraint::Natural => {
3812 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3815 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3816 let left_column_names = left_scope.column_names();
3817 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3818 let column_names: Vec<_> = left_column_names
3819 .filter(|col| right_column_names.contains(col))
3820 .cloned()
3821 .collect();
3822 plan_using_constraint(
3823 &column_names,
3824 left_qcx,
3825 left,
3826 left_scope,
3827 &right_qcx,
3828 right,
3829 right_scope,
3830 kind,
3831 None,
3832 )?
3833 }
3834 };
3835 Ok((expr, scope))
3836}
3837
3838#[allow(clippy::too_many_arguments)]
3840fn plan_using_constraint(
3841 column_names: &[ColumnName],
3842 left_qcx: &QueryContext,
3843 left: HirRelationExpr,
3844 left_scope: Scope,
3845 right_qcx: &QueryContext,
3846 right: HirRelationExpr,
3847 right_scope: Scope,
3848 kind: JoinKind,
3849 alias: Option<&Ident>,
3850) -> Result<(HirRelationExpr, Scope), PlanError> {
3851 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3852
3853 let mut unique_column_names = BTreeSet::new();
3856 for c in column_names {
3857 if !unique_column_names.insert(c) {
3858 return Err(PlanError::Unsupported {
3859 feature: format!(
3860 "column name {} appears more than once in USING clause",
3861 c.quoted()
3862 ),
3863 discussion_no: None,
3864 });
3865 }
3866 }
3867
3868 let alias_item_name = alias.map(|alias| PartialItemName {
3869 database: None,
3870 schema: None,
3871 item: alias.clone().to_string(),
3872 });
3873
3874 if let Some(alias_item_name) = &alias_item_name {
3875 for partial_item_name in both_scope.table_names() {
3876 if partial_item_name.matches(alias_item_name) {
3877 sql_bail!(
3878 "table name \"{}\" specified more than once",
3879 alias_item_name
3880 )
3881 }
3882 }
3883 }
3884
3885 let ecx = &ExprContext {
3886 qcx: right_qcx,
3887 name: "USING clause",
3888 scope: &both_scope,
3889 relation_type: &SqlRelationType::new(
3890 left_qcx
3891 .relation_type(&left)
3892 .column_types
3893 .into_iter()
3894 .chain(right_qcx.relation_type(&right).column_types)
3895 .collect(),
3896 ),
3897 allow_aggregates: false,
3898 allow_subqueries: false,
3899 allow_parameters: false,
3900 allow_windows: false,
3901 };
3902
3903 let mut join_exprs = vec![];
3904 let mut map_exprs = vec![];
3905 let mut new_items = vec![];
3906 let mut join_cols = vec![];
3907 let mut hidden_cols = vec![];
3908
3909 for column_name in column_names {
3910 let (lhs, lhs_name) = left_scope.resolve_using_column(
3912 column_name,
3913 JoinSide::Left,
3914 &mut left_qcx.name_manager.borrow_mut(),
3915 )?;
3916 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3917 column_name,
3918 JoinSide::Right,
3919 &mut right_qcx.name_manager.borrow_mut(),
3920 )?;
3921
3922 rhs.column += left_scope.len();
3924
3925 let mut exprs = coerce_homogeneous_exprs(
3927 &ecx.with_name(&format!(
3928 "NATURAL/USING join column {}",
3929 column_name.quoted()
3930 )),
3931 vec![
3932 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3933 lhs,
3934 Arc::clone(&lhs_name),
3935 )),
3936 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3937 rhs,
3938 Arc::clone(&rhs_name),
3939 )),
3940 ],
3941 None,
3942 )?;
3943 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3944
3945 match kind {
3946 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3947 join_cols.push(lhs.column);
3948 hidden_cols.push(rhs.column);
3949 }
3950 JoinKind::RightOuter => {
3951 join_cols.push(rhs.column);
3952 hidden_cols.push(lhs.column);
3953 }
3954 JoinKind::FullOuter => {
3955 join_cols.push(both_scope.items.len() + map_exprs.len());
3958 hidden_cols.push(lhs.column);
3959 hidden_cols.push(rhs.column);
3960 map_exprs.push(HirScalarExpr::call_variadic(
3961 VariadicFunc::Coalesce,
3962 vec![expr1.clone(), expr2.clone()],
3963 ));
3964 new_items.push(ScopeItem::from_column_name(column_name));
3965 }
3966 }
3967
3968 if alias_item_name.is_some() {
3973 let new_item_col = both_scope.items.len() + new_items.len();
3974 join_cols.push(new_item_col);
3975 hidden_cols.push(new_item_col);
3976
3977 new_items.push(ScopeItem::from_name(
3978 alias_item_name.clone(),
3979 column_name.clone().to_string(),
3980 ));
3981
3982 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3986 }
3987
3988 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3989 }
3990 both_scope.items.extend(new_items);
3991
3992 for c in hidden_cols {
3996 both_scope.items[c].allow_unqualified_references = false;
3997 }
3998
3999 let project_key = join_cols
4001 .into_iter()
4002 .chain(0..both_scope.items.len())
4003 .unique()
4004 .collect::<Vec<_>>();
4005
4006 both_scope = both_scope.project(&project_key);
4007
4008 let on = HirScalarExpr::variadic_and(join_exprs);
4009
4010 let both = left
4011 .join(right, on, kind)
4012 .map(map_exprs)
4013 .project(project_key);
4014 Ok((both, both_scope))
4015}
4016
4017pub fn plan_expr<'a>(
4018 ecx: &'a ExprContext,
4019 e: &Expr<Aug>,
4020) -> Result<CoercibleScalarExpr, PlanError> {
4021 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4022}
4023
4024fn plan_expr_inner<'a>(
4025 ecx: &'a ExprContext,
4026 e: &Expr<Aug>,
4027) -> Result<CoercibleScalarExpr, PlanError> {
4028 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4029 return Ok(HirScalarExpr::named_column(
4031 i,
4032 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4033 )
4034 .into());
4035 }
4036
4037 match e {
4038 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4040 Ok(plan_identifier(ecx, names)?.into())
4041 }
4042
4043 Expr::Value(val) => plan_literal(val),
4045 Expr::Parameter(n) => plan_parameter(ecx, *n),
4046 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4047 Expr::List(exprs) => plan_list(ecx, exprs, None),
4048 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4049 Expr::Row { exprs } => plan_row(ecx, exprs),
4050
4051 Expr::Op { op, expr1, expr2 } => {
4053 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4054 }
4055 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4056 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4057
4058 Expr::Not { expr } => plan_not(ecx, expr),
4060 Expr::And { left, right } => plan_and(ecx, left, right),
4061 Expr::Or { left, right } => plan_or(ecx, left, right),
4062 Expr::IsExpr {
4063 expr,
4064 construct,
4065 negated,
4066 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4067 Expr::Case {
4068 operand,
4069 conditions,
4070 results,
4071 else_result,
4072 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4073 Expr::HomogenizingFunction { function, exprs } => {
4074 plan_homogenizing_function(ecx, function, exprs)
4075 }
4076 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4077 ecx,
4078 &None,
4079 &[l_expr.clone().equals(*r_expr.clone())],
4080 &[Expr::null()],
4081 &Some(Box::new(*l_expr.clone())),
4082 )?
4083 .into()),
4084 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4085 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4086 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4087 Expr::Like {
4088 expr,
4089 pattern,
4090 escape,
4091 case_insensitive,
4092 negated,
4093 } => Ok(plan_like(
4094 ecx,
4095 expr,
4096 pattern,
4097 escape.as_deref(),
4098 *case_insensitive,
4099 *negated,
4100 )?
4101 .into()),
4102
4103 Expr::InList {
4104 expr,
4105 list,
4106 negated,
4107 } => plan_in_list(ecx, expr, list, negated),
4108
4109 Expr::Exists(query) => plan_exists(ecx, query),
4111 Expr::Subquery(query) => plan_subquery(ecx, query),
4112 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4113 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4114 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4115 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4116 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4117 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4118 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4119 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4120 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4121 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4122 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4123 }
4124}
4125
4126fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4127 if !ecx.allow_parameters {
4128 return Err(PlanError::UnknownParameter(n));
4132 }
4133 if n == 0 || n > 65536 {
4134 return Err(PlanError::UnknownParameter(n));
4135 }
4136 if ecx.param_types().borrow().contains_key(&n) {
4137 Ok(HirScalarExpr::parameter(n).into())
4138 } else {
4139 Ok(CoercibleScalarExpr::Parameter(n))
4140 }
4141}
4142
4143fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4144 let mut out = vec![];
4145 for e in exprs {
4146 out.push(plan_expr(ecx, e)?);
4147 }
4148 Ok(CoercibleScalarExpr::LiteralRecord(out))
4149}
4150
4151fn plan_cast(
4152 ecx: &ExprContext,
4153 expr: &Expr<Aug>,
4154 data_type: &ResolvedDataType,
4155) -> Result<CoercibleScalarExpr, PlanError> {
4156 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4157 let expr = match expr {
4158 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4167 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4168 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4169 _ => plan_expr(ecx, expr)?,
4170 };
4171 let ecx = &ecx.with_name("CAST");
4172 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4173 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4174 Ok(expr.into())
4175}
4176
4177fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4178 let ecx = ecx.with_name("NOT argument");
4179 Ok(plan_expr(&ecx, expr)?
4180 .type_as(&ecx, &SqlScalarType::Bool)?
4181 .call_unary(UnaryFunc::Not(expr_func::Not))
4182 .into())
4183}
4184
4185fn plan_and(
4186 ecx: &ExprContext,
4187 left: &Expr<Aug>,
4188 right: &Expr<Aug>,
4189) -> Result<CoercibleScalarExpr, PlanError> {
4190 let ecx = ecx.with_name("AND argument");
4191 Ok(HirScalarExpr::variadic_and(vec![
4192 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4193 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4194 ])
4195 .into())
4196}
4197
4198fn plan_or(
4199 ecx: &ExprContext,
4200 left: &Expr<Aug>,
4201 right: &Expr<Aug>,
4202) -> Result<CoercibleScalarExpr, PlanError> {
4203 let ecx = ecx.with_name("OR argument");
4204 Ok(HirScalarExpr::variadic_or(vec![
4205 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4206 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4207 ])
4208 .into())
4209}
4210
4211fn plan_in_list(
4212 ecx: &ExprContext,
4213 lhs: &Expr<Aug>,
4214 list: &Vec<Expr<Aug>>,
4215 negated: &bool,
4216) -> Result<CoercibleScalarExpr, PlanError> {
4217 let ecx = ecx.with_name("IN list");
4218 let or = HirScalarExpr::variadic_or(
4219 list.into_iter()
4220 .map(|e| {
4221 let eq = lhs.clone().equals(e.clone());
4222 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4223 })
4224 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4225 );
4226 Ok(if *negated {
4227 or.call_unary(UnaryFunc::Not(expr_func::Not))
4228 } else {
4229 or
4230 }
4231 .into())
4232}
4233
4234fn plan_homogenizing_function(
4235 ecx: &ExprContext,
4236 function: &HomogenizingFunction,
4237 exprs: &[Expr<Aug>],
4238) -> Result<CoercibleScalarExpr, PlanError> {
4239 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4241 match function {
4242 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4243 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4244 HomogenizingFunction::Least => VariadicFunc::Least,
4245 },
4246 coerce_homogeneous_exprs(
4247 &ecx.with_name(&function.to_string().to_lowercase()),
4248 plan_exprs(ecx, exprs)?,
4249 None,
4250 )?,
4251 );
4252 Ok(expr.into())
4253}
4254
4255fn plan_field_access(
4256 ecx: &ExprContext,
4257 expr: &Expr<Aug>,
4258 field: &Ident,
4259) -> Result<CoercibleScalarExpr, PlanError> {
4260 let field = normalize::column_name(field.clone());
4261 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4262 let ty = ecx.scalar_type(&expr);
4263 let i = match &ty {
4264 SqlScalarType::Record { fields, .. } => {
4265 fields.iter().position(|(name, _ty)| *name == field)
4266 }
4267 ty => sql_bail!(
4268 "column notation applied to type {}, which is not a composite type",
4269 ecx.humanize_scalar_type(ty, false)
4270 ),
4271 };
4272 match i {
4273 None => sql_bail!(
4274 "field {} not found in data type {}",
4275 field,
4276 ecx.humanize_scalar_type(&ty, false)
4277 ),
4278 Some(i) => Ok(expr
4279 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4280 .into()),
4281 }
4282}
4283
4284fn plan_subscript(
4285 ecx: &ExprContext,
4286 expr: &Expr<Aug>,
4287 positions: &[SubscriptPosition<Aug>],
4288) -> Result<CoercibleScalarExpr, PlanError> {
4289 assert!(
4290 !positions.is_empty(),
4291 "subscript expression must contain at least one position"
4292 );
4293
4294 let ecx = &ecx.with_name("subscripting");
4295 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4296 let ty = ecx.scalar_type(&expr);
4297 match &ty {
4298 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4299 ecx,
4300 expr,
4301 positions,
4302 if ty == SqlScalarType::Int2Vector {
4306 1
4307 } else {
4308 0
4309 },
4310 ),
4311 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4312 SqlScalarType::List { element_type, .. } => {
4313 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4315 let n_layers = ty.unwrap_list_n_layers();
4316 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4317 }
4318 ty => sql_bail!(
4319 "cannot subscript type {}",
4320 ecx.humanize_scalar_type(ty, false)
4321 ),
4322 }
4323}
4324
4325fn extract_scalar_subscript_from_positions<'a>(
4329 positions: &'a [SubscriptPosition<Aug>],
4330 expr_type_name: &str,
4331) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4332 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4333 for p in positions {
4334 if p.explicit_slice {
4335 sql_bail!("{} subscript does not support slices", expr_type_name);
4336 }
4337 assert!(
4338 p.end.is_none(),
4339 "index-appearing subscripts cannot have end value"
4340 );
4341 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4342 }
4343 Ok(scalar_subscripts)
4344}
4345
4346fn plan_subscript_array(
4347 ecx: &ExprContext,
4348 expr: HirScalarExpr,
4349 positions: &[SubscriptPosition<Aug>],
4350 offset: i64,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352 let mut exprs = Vec::with_capacity(positions.len() + 1);
4353 exprs.push(expr);
4354
4355 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4358
4359 for i in indexes {
4360 exprs.push(plan_expr(ecx, i)?.cast_to(
4361 ecx,
4362 CastContext::Explicit,
4363 &SqlScalarType::Int64,
4364 )?);
4365 }
4366
4367 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4368}
4369
4370fn plan_subscript_list(
4371 ecx: &ExprContext,
4372 mut expr: HirScalarExpr,
4373 positions: &[SubscriptPosition<Aug>],
4374 mut remaining_layers: usize,
4375 elem_type_name: &str,
4376) -> Result<CoercibleScalarExpr, PlanError> {
4377 let mut i = 0;
4378
4379 while i < positions.len() {
4380 let j = positions[i..]
4382 .iter()
4383 .position(|p| p.explicit_slice)
4384 .unwrap_or(positions.len() - i);
4385 if j != 0 {
4386 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4387 let (n, e) = plan_index_list(
4388 ecx,
4389 expr,
4390 indexes.as_slice(),
4391 remaining_layers,
4392 elem_type_name,
4393 )?;
4394 remaining_layers = n;
4395 expr = e;
4396 i += j;
4397 }
4398
4399 let j = positions[i..]
4401 .iter()
4402 .position(|p| !p.explicit_slice)
4403 .unwrap_or(positions.len() - i);
4404 if j != 0 {
4405 expr = plan_slice_list(
4406 ecx,
4407 expr,
4408 &positions[i..i + j],
4409 remaining_layers,
4410 elem_type_name,
4411 )?;
4412 i += j;
4413 }
4414 }
4415
4416 Ok(expr.into())
4417}
4418
4419fn plan_index_list(
4420 ecx: &ExprContext,
4421 expr: HirScalarExpr,
4422 indexes: &[&Expr<Aug>],
4423 n_layers: usize,
4424 elem_type_name: &str,
4425) -> Result<(usize, HirScalarExpr), PlanError> {
4426 let depth = indexes.len();
4427
4428 if depth > n_layers {
4429 if n_layers == 0 {
4430 sql_bail!("cannot subscript type {}", elem_type_name)
4431 } else {
4432 sql_bail!(
4433 "cannot index into {} layers; list only has {} layer{}",
4434 depth,
4435 n_layers,
4436 if n_layers == 1 { "" } else { "s" }
4437 )
4438 }
4439 }
4440
4441 let mut exprs = Vec::with_capacity(depth + 1);
4442 exprs.push(expr);
4443
4444 for i in indexes {
4445 exprs.push(plan_expr(ecx, i)?.cast_to(
4446 ecx,
4447 CastContext::Explicit,
4448 &SqlScalarType::Int64,
4449 )?);
4450 }
4451
4452 Ok((
4453 n_layers - depth,
4454 HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4455 ))
4456}
4457
4458fn plan_slice_list(
4459 ecx: &ExprContext,
4460 expr: HirScalarExpr,
4461 slices: &[SubscriptPosition<Aug>],
4462 n_layers: usize,
4463 elem_type_name: &str,
4464) -> Result<HirScalarExpr, PlanError> {
4465 if n_layers == 0 {
4466 sql_bail!("cannot subscript type {}", elem_type_name)
4467 }
4468
4469 let mut exprs = Vec::with_capacity(slices.len() + 1);
4471 exprs.push(expr);
4472 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4474 Ok(match position {
4475 Some(p) => {
4476 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4477 }
4478 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4479 })
4480 };
4481 for p in slices {
4482 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4483 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4484 exprs.push(start);
4485 exprs.push(end);
4486 }
4487
4488 Ok(HirScalarExpr::call_variadic(
4489 VariadicFunc::ListSliceLinear,
4490 exprs,
4491 ))
4492}
4493
4494fn plan_like(
4495 ecx: &ExprContext,
4496 expr: &Expr<Aug>,
4497 pattern: &Expr<Aug>,
4498 escape: Option<&Expr<Aug>>,
4499 case_insensitive: bool,
4500 not: bool,
4501) -> Result<HirScalarExpr, PlanError> {
4502 use CastContext::Implicit;
4503 let ecx = ecx.with_name("LIKE argument");
4504 let expr = plan_expr(&ecx, expr)?;
4505 let haystack = match ecx.scalar_type(&expr) {
4506 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4507 .type_as(&ecx, ty)?
4508 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4509 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4510 };
4511 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4512 if let Some(escape) = escape {
4513 pattern = pattern.call_binary(
4514 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4515 expr_func::LikeEscape,
4516 );
4517 }
4518 let func: BinaryFunc = if case_insensitive {
4519 expr_func::IsLikeMatchCaseInsensitive.into()
4520 } else {
4521 expr_func::IsLikeMatchCaseSensitive.into()
4522 };
4523 let like = haystack.call_binary(pattern, func);
4524 if not {
4525 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4526 } else {
4527 Ok(like)
4528 }
4529}
4530
4531fn plan_subscript_jsonb(
4532 ecx: &ExprContext,
4533 expr: HirScalarExpr,
4534 positions: &[SubscriptPosition<Aug>],
4535) -> Result<CoercibleScalarExpr, PlanError> {
4536 use CastContext::Implicit;
4537 use SqlScalarType::{Int64, String};
4538
4539 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4542
4543 let mut exprs = Vec::with_capacity(subscripts.len());
4544 for s in subscripts {
4545 let subscript = plan_expr(ecx, s)?;
4546 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4547 subscript
4548 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4549 typeconv::to_string(ecx, subscript)
4553 } else {
4554 sql_bail!("jsonb subscript type must be coercible to integer or text");
4555 };
4556 exprs.push(subscript);
4557 }
4558
4559 let expr = expr.call_binary(
4562 HirScalarExpr::call_variadic(
4563 VariadicFunc::ArrayCreate {
4564 elem_type: SqlScalarType::String,
4565 },
4566 exprs,
4567 ),
4568 expr_func::JsonbGetPath,
4569 );
4570 Ok(expr.into())
4571}
4572
4573fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4574 if !ecx.allow_subqueries {
4575 sql_bail!("{} does not allow subqueries", ecx.name)
4576 }
4577 let mut qcx = ecx.derived_query_context();
4578 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4579 Ok(expr.exists().into())
4580}
4581
4582fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4583 if !ecx.allow_subqueries {
4584 sql_bail!("{} does not allow subqueries", ecx.name)
4585 }
4586 let mut qcx = ecx.derived_query_context();
4587 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4588 let column_types = qcx.relation_type(&expr).column_types;
4589 if column_types.len() != 1 {
4590 sql_bail!(
4591 "Expected subselect to return 1 column, got {} columns",
4592 column_types.len()
4593 );
4594 }
4595 Ok(expr.select().into())
4596}
4597
4598fn plan_list_subquery(
4599 ecx: &ExprContext,
4600 query: &Query<Aug>,
4601) -> Result<CoercibleScalarExpr, PlanError> {
4602 plan_vector_like_subquery(
4603 ecx,
4604 query,
4605 |_| false,
4606 |elem_type| VariadicFunc::ListCreate { elem_type },
4607 |order_by| AggregateFunc::ListConcat { order_by },
4608 expr_func::ListListConcat.into(),
4609 |elem_type| {
4610 HirScalarExpr::literal(
4611 Datum::empty_list(),
4612 SqlScalarType::List {
4613 element_type: Box::new(elem_type),
4614 custom_id: None,
4615 },
4616 )
4617 },
4618 "list",
4619 )
4620}
4621
4622fn plan_array_subquery(
4623 ecx: &ExprContext,
4624 query: &Query<Aug>,
4625) -> Result<CoercibleScalarExpr, PlanError> {
4626 plan_vector_like_subquery(
4627 ecx,
4628 query,
4629 |elem_type| {
4630 matches!(
4631 elem_type,
4632 SqlScalarType::Char { .. }
4633 | SqlScalarType::Array { .. }
4634 | SqlScalarType::List { .. }
4635 | SqlScalarType::Map { .. }
4636 )
4637 },
4638 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4639 |order_by| AggregateFunc::ArrayConcat { order_by },
4640 expr_func::ArrayArrayConcat.into(),
4641 |elem_type| {
4642 HirScalarExpr::literal(
4643 Datum::empty_array(),
4644 SqlScalarType::Array(Box::new(elem_type)),
4645 )
4646 },
4647 "[]",
4648 )
4649}
4650
4651fn plan_vector_like_subquery<F1, F2, F3, F4>(
4653 ecx: &ExprContext,
4654 query: &Query<Aug>,
4655 is_unsupported_type: F1,
4656 vector_create: F2,
4657 aggregate_concat: F3,
4658 binary_concat: BinaryFunc,
4659 empty_literal: F4,
4660 vector_type_string: &str,
4661) -> Result<CoercibleScalarExpr, PlanError>
4662where
4663 F1: Fn(&SqlScalarType) -> bool,
4664 F2: Fn(SqlScalarType) -> VariadicFunc,
4665 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4666 F4: Fn(SqlScalarType) -> HirScalarExpr,
4667{
4668 if !ecx.allow_subqueries {
4669 sql_bail!("{} does not allow subqueries", ecx.name)
4670 }
4671
4672 let mut qcx = ecx.derived_query_context();
4673 let mut planned_query = plan_query(&mut qcx, query)?;
4674 if planned_query.limit.is_some()
4675 || !planned_query
4676 .offset
4677 .clone()
4678 .try_into_literal_int64()
4679 .is_ok_and(|offset| offset == 0)
4680 {
4681 planned_query.expr = HirRelationExpr::top_k(
4682 planned_query.expr,
4683 vec![],
4684 planned_query.order_by.clone(),
4685 planned_query.limit,
4686 planned_query.offset,
4687 planned_query.group_size_hints.limit_input_group_size,
4688 );
4689 }
4690
4691 if planned_query.project.len() != 1 {
4692 sql_bail!(
4693 "Expected subselect to return 1 column, got {} columns",
4694 planned_query.project.len()
4695 );
4696 }
4697
4698 let project_column = *planned_query.project.get(0).unwrap();
4699 let elem_type = qcx
4700 .relation_type(&planned_query.expr)
4701 .column_types
4702 .get(project_column)
4703 .cloned()
4704 .unwrap()
4705 .scalar_type();
4706
4707 if is_unsupported_type(&elem_type) {
4708 bail_unsupported!(format!(
4709 "cannot build array from subquery because return type {}{}",
4710 ecx.humanize_scalar_type(&elem_type, false),
4711 vector_type_string
4712 ));
4713 }
4714
4715 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4718 vector_create(elem_type.clone()),
4719 vec![HirScalarExpr::column(project_column)],
4720 ))
4721 .chain(
4722 planned_query
4723 .order_by
4724 .iter()
4725 .map(|co| HirScalarExpr::column(co.column)),
4726 )
4727 .collect();
4728
4729 let aggregation_projection = vec![0];
4733 let aggregation_order_by = planned_query
4734 .order_by
4735 .into_iter()
4736 .enumerate()
4737 .map(|(i, order)| ColumnOrder { column: i, ..order })
4738 .collect();
4739
4740 let reduced_expr = planned_query
4741 .expr
4742 .reduce(
4743 vec![],
4744 vec![AggregateExpr {
4745 func: aggregate_concat(aggregation_order_by),
4746 expr: Box::new(HirScalarExpr::call_variadic(
4747 VariadicFunc::RecordCreate {
4748 field_names: iter::repeat(ColumnName::from(""))
4749 .take(aggregation_exprs.len())
4750 .collect(),
4751 },
4752 aggregation_exprs,
4753 )),
4754 distinct: false,
4755 }],
4756 None,
4757 )
4758 .project(aggregation_projection);
4759
4760 Ok(reduced_expr
4762 .select()
4763 .call_binary(empty_literal(elem_type), binary_concat)
4764 .into())
4765}
4766
4767fn plan_map_subquery(
4768 ecx: &ExprContext,
4769 query: &Query<Aug>,
4770) -> Result<CoercibleScalarExpr, PlanError> {
4771 if !ecx.allow_subqueries {
4772 sql_bail!("{} does not allow subqueries", ecx.name)
4773 }
4774
4775 let mut qcx = ecx.derived_query_context();
4776 let mut query = plan_query(&mut qcx, query)?;
4777 if query.limit.is_some()
4778 || !query
4779 .offset
4780 .clone()
4781 .try_into_literal_int64()
4782 .is_ok_and(|offset| offset == 0)
4783 {
4784 query.expr = HirRelationExpr::top_k(
4785 query.expr,
4786 vec![],
4787 query.order_by.clone(),
4788 query.limit,
4789 query.offset,
4790 query.group_size_hints.limit_input_group_size,
4791 );
4792 }
4793 if query.project.len() != 2 {
4794 sql_bail!(
4795 "expected map subquery to return 2 columns, got {} columns",
4796 query.project.len()
4797 );
4798 }
4799
4800 let query_types = qcx.relation_type(&query.expr).column_types;
4801 let key_column = query.project[0];
4802 let key_type = query_types[key_column].clone().scalar_type();
4803 let value_column = query.project[1];
4804 let value_type = query_types[value_column].clone().scalar_type();
4805
4806 if key_type != SqlScalarType::String {
4807 sql_bail!("cannot build map from subquery because first column is not of type text");
4808 }
4809
4810 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4811 VariadicFunc::RecordCreate {
4812 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4813 },
4814 vec![
4815 HirScalarExpr::column(key_column),
4816 HirScalarExpr::column(value_column),
4817 ],
4818 ))
4819 .chain(
4820 query
4821 .order_by
4822 .iter()
4823 .map(|co| HirScalarExpr::column(co.column)),
4824 )
4825 .collect();
4826
4827 let expr = query
4828 .expr
4829 .reduce(
4830 vec![],
4831 vec![AggregateExpr {
4832 func: AggregateFunc::MapAgg {
4833 order_by: query
4834 .order_by
4835 .into_iter()
4836 .enumerate()
4837 .map(|(i, order)| ColumnOrder { column: i, ..order })
4838 .collect(),
4839 value_type: value_type.clone(),
4840 },
4841 expr: Box::new(HirScalarExpr::call_variadic(
4842 VariadicFunc::RecordCreate {
4843 field_names: iter::repeat(ColumnName::from(""))
4844 .take(aggregation_exprs.len())
4845 .collect(),
4846 },
4847 aggregation_exprs,
4848 )),
4849 distinct: false,
4850 }],
4851 None,
4852 )
4853 .project(vec![0]);
4854
4855 let expr = HirScalarExpr::call_variadic(
4857 VariadicFunc::Coalesce,
4858 vec![
4859 expr.select(),
4860 HirScalarExpr::literal(
4861 Datum::empty_map(),
4862 SqlScalarType::Map {
4863 value_type: Box::new(value_type),
4864 custom_id: None,
4865 },
4866 ),
4867 ],
4868 );
4869
4870 Ok(expr.into())
4871}
4872
4873fn plan_collate(
4874 ecx: &ExprContext,
4875 expr: &Expr<Aug>,
4876 collation: &UnresolvedItemName,
4877) -> Result<CoercibleScalarExpr, PlanError> {
4878 if collation.0.len() == 2
4879 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4880 && collation.0[1] == ident!("default")
4881 {
4882 plan_expr(ecx, expr)
4883 } else {
4884 bail_unsupported!("COLLATE");
4885 }
4886}
4887
4888fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4895where
4896 E: std::borrow::Borrow<Expr<Aug>>,
4897{
4898 let mut out = vec![];
4899 for expr in exprs {
4900 out.push(plan_expr(ecx, expr.borrow())?);
4901 }
4902 Ok(out)
4903}
4904
4905fn plan_array(
4907 ecx: &ExprContext,
4908 exprs: &[Expr<Aug>],
4909 type_hint: Option<&SqlScalarType>,
4910) -> Result<CoercibleScalarExpr, PlanError> {
4911 let mut out = vec![];
4913 for expr in exprs {
4914 out.push(match expr {
4915 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4918 _ => plan_expr(ecx, expr)?,
4919 });
4920 }
4921
4922 let type_hint = match type_hint {
4924 Some(SqlScalarType::Array(elem_type)) => {
4929 let multidimensional = out.iter().any(|e| {
4930 matches!(
4931 ecx.scalar_type(e),
4932 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4933 )
4934 });
4935 if multidimensional {
4936 type_hint
4937 } else {
4938 Some(&**elem_type)
4939 }
4940 }
4941 Some(_) => None,
4945 None => None,
4947 };
4948
4949 let (elem_type, exprs) = if exprs.is_empty() {
4951 if let Some(elem_type) = type_hint {
4952 (elem_type.clone(), vec![])
4953 } else {
4954 sql_bail!("cannot determine type of empty array");
4955 }
4956 } else {
4957 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4958 (ecx.scalar_type(&out[0]), out)
4959 };
4960
4961 if matches!(
4967 elem_type,
4968 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4969 ) {
4970 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4971 }
4972
4973 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4974}
4975
4976fn plan_list(
4977 ecx: &ExprContext,
4978 exprs: &[Expr<Aug>],
4979 type_hint: Option<&SqlScalarType>,
4980) -> Result<CoercibleScalarExpr, PlanError> {
4981 let (elem_type, exprs) = if exprs.is_empty() {
4982 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4983 (element_type.without_modifiers(), vec![])
4984 } else {
4985 sql_bail!("cannot determine type of empty list");
4986 }
4987 } else {
4988 let type_hint = match type_hint {
4989 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4990 _ => None,
4991 };
4992
4993 let mut out = vec![];
4994 for expr in exprs {
4995 out.push(match expr {
4996 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4999 _ => plan_expr(ecx, expr)?,
5000 });
5001 }
5002 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5003 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5004 };
5005
5006 if matches!(elem_type, SqlScalarType::Char { .. }) {
5007 bail_unsupported!("char list");
5008 }
5009
5010 Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
5011}
5012
5013fn plan_map(
5014 ecx: &ExprContext,
5015 entries: &[MapEntry<Aug>],
5016 type_hint: Option<&SqlScalarType>,
5017) -> Result<CoercibleScalarExpr, PlanError> {
5018 let (value_type, exprs) = if entries.is_empty() {
5019 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5020 (value_type.without_modifiers(), vec![])
5021 } else {
5022 sql_bail!("cannot determine type of empty map");
5023 }
5024 } else {
5025 let type_hint = match type_hint {
5026 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5027 _ => None,
5028 };
5029
5030 let mut keys = vec![];
5031 let mut values = vec![];
5032 for MapEntry { key, value } in entries {
5033 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5034 let value = match value {
5035 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5038 _ => plan_expr(ecx, value)?,
5039 };
5040 keys.push(key);
5041 values.push(value);
5042 }
5043 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5044 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5045 let out = itertools::interleave(keys, values).collect();
5046 (value_type, out)
5047 };
5048
5049 if matches!(value_type, SqlScalarType::Char { .. }) {
5050 bail_unsupported!("char map");
5051 }
5052
5053 let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5054 Ok(expr.into())
5055}
5056
5057pub fn coerce_homogeneous_exprs(
5074 ecx: &ExprContext,
5075 exprs: Vec<CoercibleScalarExpr>,
5076 force_type: Option<&SqlScalarType>,
5077) -> Result<Vec<HirScalarExpr>, PlanError> {
5078 assert!(!exprs.is_empty());
5079
5080 let target_holder;
5081 let target = match force_type {
5082 Some(t) => t,
5083 None => {
5084 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5085 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5086 &target_holder
5087 }
5088 };
5089
5090 let mut out = Vec::new();
5092 for expr in exprs {
5093 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5094 let ccx = match force_type {
5095 None => CastContext::Implicit,
5096 Some(_) => CastContext::Explicit,
5097 };
5098 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5099 Ok(expr) => out.push(expr),
5100 Err(_) => sql_bail!(
5101 "{} could not convert type {} to {}",
5102 ecx.name,
5103 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5104 ecx.humanize_scalar_type(target, false),
5105 ),
5106 }
5107 }
5108 Ok(out)
5109}
5110
5111pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5114 obe: &OrderByExpr<T>,
5115 column: usize,
5116) -> ColumnOrder {
5117 let desc = !obe.asc.unwrap_or(true);
5118 ColumnOrder {
5119 column,
5120 desc,
5121 nulls_last: obe.nulls_last.unwrap_or(!desc),
5124 }
5125}
5126
5127fn plan_function_order_by(
5135 ecx: &ExprContext,
5136 order_by: &[OrderByExpr<Aug>],
5137) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5138 let mut order_by_exprs = vec![];
5139 let mut col_orders = vec![];
5140 {
5141 for (i, obe) in order_by.iter().enumerate() {
5142 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5146 order_by_exprs.push(expr);
5147 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5148 }
5149 }
5150 Ok((order_by_exprs, col_orders))
5151}
5152
5153fn plan_aggregate_common(
5155 ecx: &ExprContext,
5156 Function::<Aug> {
5157 name,
5158 args,
5159 filter,
5160 over: _,
5161 distinct,
5162 }: &Function<Aug>,
5163) -> Result<AggregateExpr, PlanError> {
5164 let impls = match resolve_func(ecx, name, args)? {
5179 Func::Aggregate(impls) => impls,
5180 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5181 };
5182
5183 let (args, order_by) = match &args {
5192 FunctionArgs::Star => (vec![], vec![]),
5193 FunctionArgs::Args { args, order_by } => {
5194 if args.is_empty() {
5195 sql_bail!(
5196 "{}(*) must be used to call a parameterless aggregate function",
5197 ecx.qcx
5198 .scx
5199 .humanize_resolved_name(name)
5200 .expect("name actually resolved")
5201 );
5202 }
5203 let args = plan_exprs(ecx, args)?;
5204 (args, order_by.clone())
5205 }
5206 };
5207
5208 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5209
5210 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5211 if let Some(filter) = &filter {
5212 let cond =
5222 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5223 let expr_typ = ecx.scalar_type(&expr);
5224 expr = HirScalarExpr::if_then_else(
5225 cond,
5226 expr,
5227 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5228 );
5229 }
5230
5231 let mut seen_outer = false;
5232 let mut seen_inner = false;
5233 #[allow(deprecated)]
5234 expr.visit_columns(0, &mut |depth, col| {
5235 if depth == 0 && col.level == 0 {
5236 seen_inner = true;
5237 } else if col.level > depth {
5238 seen_outer = true;
5239 }
5240 });
5241 if seen_outer && !seen_inner {
5242 bail_unsupported!(
5243 3720,
5244 "aggregate functions that refer exclusively to outer columns"
5245 );
5246 }
5247
5248 if func.is_order_sensitive() {
5251 let field_names = iter::repeat(ColumnName::from(""))
5252 .take(1 + order_by_exprs.len())
5253 .collect();
5254 let mut exprs = vec![expr];
5255 exprs.extend(order_by_exprs);
5256 expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5257 }
5258
5259 Ok(AggregateExpr {
5260 func,
5261 expr: Box::new(expr),
5262 distinct: *distinct,
5263 })
5264}
5265
5266fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5267 let mut names = names.to_vec();
5268 let col_name = normalize::column_name(names.pop().unwrap());
5269
5270 if !names.is_empty() {
5272 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5273 let (i, i_name) = ecx.scope.resolve_table_column(
5274 &ecx.qcx.outer_scopes,
5275 &table_name,
5276 &col_name,
5277 &mut ecx.qcx.name_manager.borrow_mut(),
5278 )?;
5279 return Ok(HirScalarExpr::named_column(i, i_name));
5280 }
5281
5282 let similar_names = match ecx.scope.resolve_column(
5285 &ecx.qcx.outer_scopes,
5286 &col_name,
5287 &mut ecx.qcx.name_manager.borrow_mut(),
5288 ) {
5289 Ok((i, i_name)) => {
5290 return Ok(HirScalarExpr::named_column(i, i_name));
5291 }
5292 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5293 Err(e) => return Err(e),
5294 };
5295
5296 let items = ecx.scope.items_from_table(
5299 &ecx.qcx.outer_scopes,
5300 &PartialItemName {
5301 database: None,
5302 schema: None,
5303 item: col_name.as_str().to_owned(),
5304 },
5305 )?;
5306 match items.as_slice() {
5307 [] => Err(PlanError::UnknownColumn {
5309 table: None,
5310 column: col_name,
5311 similar: similar_names,
5312 }),
5313 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5318 *column,
5319 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5320 )),
5321 _ => {
5324 let mut has_exists_column = None;
5325 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5326 .into_iter()
5327 .filter_map(|(column, item)| {
5328 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5329 has_exists_column = Some(column);
5330 None
5331 } else {
5332 let expr = HirScalarExpr::named_column(
5333 column,
5334 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5335 );
5336 let name = item.column_name.clone();
5337 Some((expr, name))
5338 }
5339 })
5340 .unzip();
5341 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5343 exprs.into_element()
5344 } else {
5345 HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5346 };
5347 if let Some(has_exists_column) = has_exists_column {
5348 Ok(HirScalarExpr::if_then_else(
5349 HirScalarExpr::unnamed_column(has_exists_column)
5350 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5351 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5352 expr,
5353 ))
5354 } else {
5355 Ok(expr)
5356 }
5357 }
5358 }
5359}
5360
5361fn plan_op(
5362 ecx: &ExprContext,
5363 op: &str,
5364 expr1: &Expr<Aug>,
5365 expr2: Option<&Expr<Aug>>,
5366) -> Result<HirScalarExpr, PlanError> {
5367 let impls = func::resolve_op(op)?;
5368 let args = match expr2 {
5369 None => plan_exprs(ecx, &[expr1])?,
5370 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5371 };
5372 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5373}
5374
5375fn plan_function<'a>(
5376 ecx: &ExprContext,
5377 f @ Function {
5378 name,
5379 args,
5380 filter,
5381 over,
5382 distinct,
5383 }: &'a Function<Aug>,
5384) -> Result<HirScalarExpr, PlanError> {
5385 let impls = match resolve_func(ecx, name, args)? {
5386 Func::Table(_) => {
5387 sql_bail!(
5388 "table functions are not allowed in {} (function {})",
5389 ecx.name,
5390 name
5391 );
5392 }
5393 Func::Scalar(impls) => {
5394 if over.is_some() {
5395 sql_bail!(
5396 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5397 );
5398 }
5399 impls
5400 }
5401 Func::ScalarWindow(impls) => {
5402 let (
5403 ignore_nulls,
5404 order_by_exprs,
5405 col_orders,
5406 _window_frame,
5407 partition_by,
5408 scalar_args,
5409 ) = plan_window_function_non_aggr(ecx, f)?;
5410
5411 if !scalar_args.is_empty() {
5415 if let ResolvedItemName::Item {
5416 full_name: FullItemName { item, .. },
5417 ..
5418 } = name
5419 {
5420 sql_bail!(
5421 "function {} has 0 parameters, but was called with {}",
5422 item,
5423 scalar_args.len()
5424 );
5425 }
5426 }
5427
5428 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5433
5434 if ignore_nulls {
5435 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5438 }
5439
5440 return Ok(HirScalarExpr::windowing(WindowExpr {
5441 func: WindowExprType::Scalar(ScalarWindowExpr {
5442 func,
5443 order_by: col_orders,
5444 }),
5445 partition_by,
5446 order_by: order_by_exprs,
5447 }));
5448 }
5449 Func::ValueWindow(impls) => {
5450 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5451 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5452 window_plan;
5453
5454 let (args_encoded, func) =
5455 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5456
5457 if ignore_nulls {
5458 match func {
5459 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5460 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5461 }
5462 }
5463
5464 return Ok(HirScalarExpr::windowing(WindowExpr {
5465 func: WindowExprType::Value(ValueWindowExpr {
5466 func,
5467 args: Box::new(args_encoded),
5468 order_by: col_orders,
5469 window_frame,
5470 ignore_nulls, }),
5472 partition_by,
5473 order_by: order_by_exprs,
5474 }));
5475 }
5476 Func::Aggregate(_) => {
5477 if f.over.is_none() {
5478 if ecx.allow_aggregates {
5480 sql_bail!(
5483 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5484 name,
5485 );
5486 } else {
5487 sql_bail!(
5490 "aggregate functions are not allowed in {} (function {})",
5491 ecx.name,
5492 name
5493 );
5494 }
5495 } else {
5496 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5497 plan_window_function_common(ecx, &f.name, &f.over)?;
5498
5499 match (&window_frame.start_bound, &window_frame.end_bound) {
5501 (
5502 mz_expr::WindowFrameBound::UnboundedPreceding,
5503 mz_expr::WindowFrameBound::OffsetPreceding(..),
5504 )
5505 | (
5506 mz_expr::WindowFrameBound::UnboundedPreceding,
5507 mz_expr::WindowFrameBound::OffsetFollowing(..),
5508 )
5509 | (
5510 mz_expr::WindowFrameBound::OffsetPreceding(..),
5511 mz_expr::WindowFrameBound::UnboundedFollowing,
5512 )
5513 | (
5514 mz_expr::WindowFrameBound::OffsetFollowing(..),
5515 mz_expr::WindowFrameBound::UnboundedFollowing,
5516 ) => bail_unsupported!("mixed unbounded - offset frames"),
5517 (_, _) => {} }
5519
5520 if ignore_nulls {
5521 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5525 }
5526
5527 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5528
5529 if aggregate_expr.distinct {
5530 bail_unsupported!("DISTINCT in window aggregates");
5532 }
5533
5534 return Ok(HirScalarExpr::windowing(WindowExpr {
5535 func: WindowExprType::Aggregate(AggregateWindowExpr {
5536 aggregate_expr,
5537 order_by: col_orders,
5538 window_frame,
5539 }),
5540 partition_by,
5541 order_by: order_by_exprs,
5542 }));
5543 }
5544 }
5545 };
5546
5547 if over.is_some() {
5548 unreachable!("If there is an OVER clause, we should have returned already above.");
5549 }
5550
5551 if *distinct {
5552 sql_bail!(
5553 "DISTINCT specified, but {} is not an aggregate function",
5554 ecx.qcx
5555 .scx
5556 .humanize_resolved_name(name)
5557 .expect("already resolved")
5558 );
5559 }
5560 if filter.is_some() {
5561 sql_bail!(
5562 "FILTER specified, but {} is not an aggregate function",
5563 ecx.qcx
5564 .scx
5565 .humanize_resolved_name(name)
5566 .expect("already resolved")
5567 );
5568 }
5569
5570 let scalar_args = match &args {
5571 FunctionArgs::Star => {
5572 sql_bail!(
5573 "* argument is invalid with non-aggregate function {}",
5574 ecx.qcx
5575 .scx
5576 .humanize_resolved_name(name)
5577 .expect("already resolved")
5578 )
5579 }
5580 FunctionArgs::Args { args, order_by } => {
5581 if !order_by.is_empty() {
5582 sql_bail!(
5583 "ORDER BY specified, but {} is not an aggregate function",
5584 ecx.qcx
5585 .scx
5586 .humanize_resolved_name(name)
5587 .expect("already resolved")
5588 );
5589 }
5590 plan_exprs(ecx, args)?
5591 }
5592 };
5593
5594 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5595}
5596
5597pub const IGNORE_NULLS_ERROR_MSG: &str =
5598 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5599
5600pub fn resolve_func(
5604 ecx: &ExprContext,
5605 name: &ResolvedItemName,
5606 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5607) -> Result<&'static Func, PlanError> {
5608 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5609 if let Ok(f) = i.func() {
5610 return Ok(f);
5611 }
5612 }
5613
5614 let cexprs = match args {
5617 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5618 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5619 if !order_by.is_empty() {
5620 sql_bail!(
5621 "ORDER BY specified, but {} is not an aggregate function",
5622 name
5623 );
5624 }
5625 plan_exprs(ecx, args)?
5626 }
5627 };
5628
5629 let arg_types: Vec<_> = cexprs
5630 .into_iter()
5631 .map(|ty| match ecx.scalar_type(&ty) {
5632 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5633 CoercibleScalarType::Record(_) => "record".to_string(),
5634 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5635 })
5636 .collect();
5637
5638 Err(PlanError::UnknownFunction {
5639 name: name.to_string(),
5640 arg_types,
5641 })
5642}
5643
5644fn plan_is_expr<'a>(
5645 ecx: &ExprContext,
5646 expr: &'a Expr<Aug>,
5647 construct: &IsExprConstruct<Aug>,
5648 not: bool,
5649) -> Result<HirScalarExpr, PlanError> {
5650 let expr_hir = plan_expr(ecx, expr)?;
5651
5652 let mut result = match construct {
5653 IsExprConstruct::Null => {
5654 expr_hir.type_as_any(ecx)?.call_is_null()
5659 }
5660 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5661 IsExprConstruct::True => expr_hir
5662 .type_as(ecx, &SqlScalarType::Bool)?
5663 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5664 IsExprConstruct::False => expr_hir
5665 .type_as(ecx, &SqlScalarType::Bool)?
5666 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5667 IsExprConstruct::DistinctFrom(expr2) => {
5668 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5679 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5680
5681 let expr1_hir = expr_hir.type_as_any(ecx)?;
5682 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5683
5684 let term1 = HirScalarExpr::variadic_or(vec![
5685 ne_hir,
5686 expr1_hir.clone().call_is_null(),
5687 expr2_hir.clone().call_is_null(),
5688 ]);
5689 let term2 = HirScalarExpr::variadic_or(vec![
5690 expr1_hir.call_is_null().not(),
5691 expr2_hir.call_is_null().not(),
5692 ]);
5693 term1.and(term2)
5694 }
5695 };
5696 if not {
5697 result = result.not();
5698 }
5699 Ok(result)
5700}
5701
5702fn plan_case<'a>(
5703 ecx: &ExprContext,
5704 operand: &'a Option<Box<Expr<Aug>>>,
5705 conditions: &'a [Expr<Aug>],
5706 results: &'a [Expr<Aug>],
5707 else_result: &'a Option<Box<Expr<Aug>>>,
5708) -> Result<HirScalarExpr, PlanError> {
5709 let mut cond_exprs = Vec::new();
5710 let mut result_exprs = Vec::new();
5711 for (c, r) in conditions.iter().zip_eq(results) {
5712 let c = match operand {
5713 Some(operand) => operand.clone().equals(c.clone()),
5714 None => c.clone(),
5715 };
5716 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5717 cond_exprs.push(cexpr);
5718 result_exprs.push(r);
5719 }
5720 result_exprs.push(match else_result {
5721 Some(else_result) => else_result,
5722 None => &Expr::Value(Value::Null),
5723 });
5724 let mut result_exprs = coerce_homogeneous_exprs(
5725 &ecx.with_name("CASE"),
5726 plan_exprs(ecx, &result_exprs)?,
5727 None,
5728 )?;
5729 let mut expr = result_exprs.pop().unwrap();
5730 assert_eq!(cond_exprs.len(), result_exprs.len());
5731 for (cexpr, rexpr) in cond_exprs
5732 .into_iter()
5733 .rev()
5734 .zip_eq(result_exprs.into_iter().rev())
5735 {
5736 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5737 }
5738 Ok(expr)
5739}
5740
5741fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5742 let (datum, scalar_type) = match l {
5743 Value::Number(s) => {
5744 let d = strconv::parse_numeric(s.as_str())?;
5745 if !s.contains(&['E', '.'][..]) {
5746 if let Ok(n) = d.0.try_into() {
5748 (Datum::Int32(n), SqlScalarType::Int32)
5749 } else if let Ok(n) = d.0.try_into() {
5750 (Datum::Int64(n), SqlScalarType::Int64)
5751 } else {
5752 (
5753 Datum::Numeric(d),
5754 SqlScalarType::Numeric { max_scale: None },
5755 )
5756 }
5757 } else {
5758 (
5759 Datum::Numeric(d),
5760 SqlScalarType::Numeric { max_scale: None },
5761 )
5762 }
5763 }
5764 Value::HexString(_) => bail_unsupported!("hex string literals"),
5765 Value::Boolean(b) => match b {
5766 false => (Datum::False, SqlScalarType::Bool),
5767 true => (Datum::True, SqlScalarType::Bool),
5768 },
5769 Value::Interval(i) => {
5770 let i = literal::plan_interval(i)?;
5771 (Datum::Interval(i), SqlScalarType::Interval)
5772 }
5773 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5774 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5775 };
5776 let expr = HirScalarExpr::literal(datum, scalar_type);
5777 Ok(expr.into())
5778}
5779
5780fn plan_window_function_non_aggr<'a>(
5783 ecx: &ExprContext,
5784 Function {
5785 name,
5786 args,
5787 filter,
5788 over,
5789 distinct,
5790 }: &'a Function<Aug>,
5791) -> Result<
5792 (
5793 bool,
5794 Vec<HirScalarExpr>,
5795 Vec<ColumnOrder>,
5796 mz_expr::WindowFrame,
5797 Vec<HirScalarExpr>,
5798 Vec<CoercibleScalarExpr>,
5799 ),
5800 PlanError,
5801> {
5802 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5803 plan_window_function_common(ecx, name, over)?;
5804
5805 if *distinct {
5806 sql_bail!(
5807 "DISTINCT specified, but {} is not an aggregate function",
5808 name
5809 );
5810 }
5811
5812 if filter.is_some() {
5813 bail_unsupported!("FILTER in non-aggregate window functions");
5814 }
5815
5816 let scalar_args = match &args {
5817 FunctionArgs::Star => {
5818 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5819 }
5820 FunctionArgs::Args { args, order_by } => {
5821 if !order_by.is_empty() {
5822 sql_bail!(
5823 "ORDER BY specified, but {} is not an aggregate function",
5824 name
5825 );
5826 }
5827 plan_exprs(ecx, args)?
5828 }
5829 };
5830
5831 Ok((
5832 ignore_nulls,
5833 order_by_exprs,
5834 col_orders,
5835 window_frame,
5836 partition,
5837 scalar_args,
5838 ))
5839}
5840
5841fn plan_window_function_common(
5843 ecx: &ExprContext,
5844 name: &<Aug as AstInfo>::ItemName,
5845 over: &Option<WindowSpec<Aug>>,
5846) -> Result<
5847 (
5848 bool,
5849 Vec<HirScalarExpr>,
5850 Vec<ColumnOrder>,
5851 mz_expr::WindowFrame,
5852 Vec<HirScalarExpr>,
5853 ),
5854 PlanError,
5855> {
5856 if !ecx.allow_windows {
5857 sql_bail!(
5858 "window functions are not allowed in {} (function {})",
5859 ecx.name,
5860 name
5861 );
5862 }
5863
5864 let window_spec = match over.as_ref() {
5865 Some(over) => over,
5866 None => sql_bail!("window function {} requires an OVER clause", name),
5867 };
5868 if window_spec.ignore_nulls && window_spec.respect_nulls {
5869 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5870 }
5871 let window_frame = match window_spec.window_frame.as_ref() {
5872 Some(frame) => plan_window_frame(frame)?,
5873 None => mz_expr::WindowFrame::default(),
5874 };
5875 let mut partition = Vec::new();
5876 for expr in &window_spec.partition_by {
5877 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5878 }
5879
5880 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5881
5882 Ok((
5883 window_spec.ignore_nulls,
5884 order_by_exprs,
5885 col_orders,
5886 window_frame,
5887 partition,
5888 ))
5889}
5890
5891fn plan_window_frame(
5892 WindowFrame {
5893 units,
5894 start_bound,
5895 end_bound,
5896 }: &WindowFrame,
5897) -> Result<mz_expr::WindowFrame, PlanError> {
5898 use mz_expr::WindowFrameBound::*;
5899 let units = window_frame_unit_ast_to_expr(units)?;
5900 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5901 let end_bound = end_bound
5902 .as_ref()
5903 .map(window_frame_bound_ast_to_expr)
5904 .unwrap_or(CurrentRow);
5905
5906 match (&start_bound, &end_bound) {
5908 (UnboundedFollowing, _) => {
5910 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5911 }
5912 (_, UnboundedPreceding) => {
5914 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5915 }
5916 (CurrentRow, OffsetPreceding(_)) => {
5918 sql_bail!("frame starting from current row cannot have preceding rows")
5919 }
5920 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5921 sql_bail!("frame starting from following row cannot have preceding rows")
5922 }
5923 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5926 if *o1 > 1000000 || *o2 > 1000000 {
5930 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5931 }
5932 }
5933 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5934 if *o1 > 1000000 || *o2 > 1000000 {
5935 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5936 }
5937 }
5938 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5939 if *o1 > 1000000 || *o2 > 1000000 {
5940 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5941 }
5942 }
5943 (OffsetPreceding(o), CurrentRow) => {
5944 if *o > 1000000 {
5945 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5946 }
5947 }
5948 (CurrentRow, OffsetFollowing(o)) => {
5949 if *o > 1000000 {
5950 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5951 }
5952 }
5953 (_, _) => (),
5955 }
5956
5957 if units == mz_expr::WindowFrameUnits::Range
5960 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5961 {
5962 bail_unsupported!("RANGE in non-default window frames")
5963 }
5964
5965 let frame = mz_expr::WindowFrame {
5966 units,
5967 start_bound,
5968 end_bound,
5969 };
5970 Ok(frame)
5971}
5972
5973fn window_frame_unit_ast_to_expr(
5974 unit: &WindowFrameUnits,
5975) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5976 match unit {
5977 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5978 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5979 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5980 }
5981}
5982
5983fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5984 match bound {
5985 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5986 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5987 WindowFrameBound::Preceding(Some(offset)) => {
5988 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5989 }
5990 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5991 WindowFrameBound::Following(Some(offset)) => {
5992 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5993 }
5994 }
5995}
5996
5997pub fn scalar_type_from_sql(
5998 scx: &StatementContext,
5999 data_type: &ResolvedDataType,
6000) -> Result<SqlScalarType, PlanError> {
6001 match data_type {
6002 ResolvedDataType::AnonymousList(elem_type) => {
6003 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6004 if matches!(elem_type, SqlScalarType::Char { .. }) {
6005 bail_unsupported!("char list");
6006 }
6007 Ok(SqlScalarType::List {
6008 element_type: Box::new(elem_type),
6009 custom_id: None,
6010 })
6011 }
6012 ResolvedDataType::AnonymousMap {
6013 key_type,
6014 value_type,
6015 } => {
6016 match scalar_type_from_sql(scx, key_type)? {
6017 SqlScalarType::String => {}
6018 other => sql_bail!(
6019 "map key type must be {}, got {}",
6020 scx.humanize_scalar_type(&SqlScalarType::String, false),
6021 scx.humanize_scalar_type(&other, false)
6022 ),
6023 }
6024 Ok(SqlScalarType::Map {
6025 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6026 custom_id: None,
6027 })
6028 }
6029 ResolvedDataType::Named { id, modifiers, .. } => {
6030 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6031 }
6032 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6033 }
6034}
6035
6036pub fn scalar_type_from_catalog(
6037 catalog: &dyn SessionCatalog,
6038 id: CatalogItemId,
6039 modifiers: &[i64],
6040) -> Result<SqlScalarType, PlanError> {
6041 let entry = catalog.get_item(&id);
6042 let type_details = match entry.type_details() {
6043 Some(type_details) => type_details,
6044 None => {
6045 sql_bail!(
6048 "internal error: {} does not refer to a type",
6049 catalog.resolve_full_name(entry.name()).to_string().quoted()
6050 );
6051 }
6052 };
6053 match &type_details.typ {
6054 CatalogType::Numeric => {
6055 let mut modifiers = modifiers.iter().fuse();
6056 let precision = match modifiers.next() {
6057 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6058 sql_bail!(
6059 "precision for type numeric must be between 1 and {}",
6060 NUMERIC_DATUM_MAX_PRECISION,
6061 );
6062 }
6063 Some(p) => Some(*p),
6064 None => None,
6065 };
6066 let scale = match modifiers.next() {
6067 Some(scale) => {
6068 if let Some(precision) = precision {
6069 if *scale > precision {
6070 sql_bail!(
6071 "scale for type numeric must be between 0 and precision {}",
6072 precision
6073 );
6074 }
6075 }
6076 Some(NumericMaxScale::try_from(*scale)?)
6077 }
6078 None => None,
6079 };
6080 if modifiers.next().is_some() {
6081 sql_bail!("type numeric supports at most two type modifiers");
6082 }
6083 Ok(SqlScalarType::Numeric { max_scale: scale })
6084 }
6085 CatalogType::Char => {
6086 let mut modifiers = modifiers.iter().fuse();
6087 let length = match modifiers.next() {
6088 Some(l) => Some(CharLength::try_from(*l)?),
6089 None => Some(CharLength::ONE),
6090 };
6091 if modifiers.next().is_some() {
6092 sql_bail!("type character supports at most one type modifier");
6093 }
6094 Ok(SqlScalarType::Char { length })
6095 }
6096 CatalogType::VarChar => {
6097 let mut modifiers = modifiers.iter().fuse();
6098 let length = match modifiers.next() {
6099 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6100 None => None,
6101 };
6102 if modifiers.next().is_some() {
6103 sql_bail!("type character varying supports at most one type modifier");
6104 }
6105 Ok(SqlScalarType::VarChar { max_length: length })
6106 }
6107 CatalogType::Timestamp => {
6108 let mut modifiers = modifiers.iter().fuse();
6109 let precision = match modifiers.next() {
6110 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6111 None => None,
6112 };
6113 if modifiers.next().is_some() {
6114 sql_bail!("type timestamp supports at most one type modifier");
6115 }
6116 Ok(SqlScalarType::Timestamp { precision })
6117 }
6118 CatalogType::TimestampTz => {
6119 let mut modifiers = modifiers.iter().fuse();
6120 let precision = match modifiers.next() {
6121 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6122 None => None,
6123 };
6124 if modifiers.next().is_some() {
6125 sql_bail!("type timestamp with time zone supports at most one type modifier");
6126 }
6127 Ok(SqlScalarType::TimestampTz { precision })
6128 }
6129 t => {
6130 if !modifiers.is_empty() {
6131 sql_bail!(
6132 "{} does not support type modifiers",
6133 catalog.resolve_full_name(entry.name()).to_string()
6134 );
6135 }
6136 match t {
6137 CatalogType::Array {
6138 element_reference: element_id,
6139 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6140 catalog,
6141 *element_id,
6142 modifiers,
6143 )?))),
6144 CatalogType::List {
6145 element_reference: element_id,
6146 element_modifiers,
6147 } => Ok(SqlScalarType::List {
6148 element_type: Box::new(scalar_type_from_catalog(
6149 catalog,
6150 *element_id,
6151 element_modifiers,
6152 )?),
6153 custom_id: Some(id),
6154 }),
6155 CatalogType::Map {
6156 key_reference: _,
6157 key_modifiers: _,
6158 value_reference: value_id,
6159 value_modifiers,
6160 } => Ok(SqlScalarType::Map {
6161 value_type: Box::new(scalar_type_from_catalog(
6162 catalog,
6163 *value_id,
6164 value_modifiers,
6165 )?),
6166 custom_id: Some(id),
6167 }),
6168 CatalogType::Range {
6169 element_reference: element_id,
6170 } => Ok(SqlScalarType::Range {
6171 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6172 }),
6173 CatalogType::Record { fields } => {
6174 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6175 .iter()
6176 .map(|f| {
6177 let scalar_type = scalar_type_from_catalog(
6178 catalog,
6179 f.type_reference,
6180 &f.type_modifiers,
6181 )?;
6182 Ok((
6183 f.name.clone(),
6184 SqlColumnType {
6185 scalar_type,
6186 nullable: true,
6187 },
6188 ))
6189 })
6190 .collect::<Result<Box<_>, PlanError>>()?;
6191 Ok(SqlScalarType::Record {
6192 fields: scalars,
6193 custom_id: Some(id),
6194 })
6195 }
6196 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6197 CatalogType::Bool => Ok(SqlScalarType::Bool),
6198 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6199 CatalogType::Date => Ok(SqlScalarType::Date),
6200 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6201 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6202 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6203 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6204 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6205 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6206 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6207 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6208 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6209 CatalogType::Interval => Ok(SqlScalarType::Interval),
6210 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6211 CatalogType::Oid => Ok(SqlScalarType::Oid),
6212 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6213 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6214 CatalogType::Pseudo => {
6215 sql_bail!(
6216 "cannot reference pseudo type {}",
6217 catalog.resolve_full_name(entry.name()).to_string()
6218 )
6219 }
6220 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6221 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6222 CatalogType::RegType => Ok(SqlScalarType::RegType),
6223 CatalogType::String => Ok(SqlScalarType::String),
6224 CatalogType::Time => Ok(SqlScalarType::Time),
6225 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6226 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6227 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6228 CatalogType::Numeric => unreachable!("handled above"),
6229 CatalogType::Char => unreachable!("handled above"),
6230 CatalogType::VarChar => unreachable!("handled above"),
6231 CatalogType::Timestamp => unreachable!("handled above"),
6232 CatalogType::TimestampTz => unreachable!("handled above"),
6233 }
6234 }
6235 }
6236}
6237
6238struct AggregateTableFuncVisitor<'a> {
6241 scx: &'a StatementContext<'a>,
6242 aggs: Vec<Function<Aug>>,
6243 within_aggregate: bool,
6244 tables: BTreeMap<Function<Aug>, String>,
6245 table_disallowed_context: Vec<&'static str>,
6246 in_select_item: bool,
6247 id_gen: IdGen,
6248 err: Option<PlanError>,
6249}
6250
6251impl<'a> AggregateTableFuncVisitor<'a> {
6252 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6253 AggregateTableFuncVisitor {
6254 scx,
6255 aggs: Vec::new(),
6256 within_aggregate: false,
6257 tables: BTreeMap::new(),
6258 table_disallowed_context: Vec::new(),
6259 in_select_item: false,
6260 id_gen: Default::default(),
6261 err: None,
6262 }
6263 }
6264
6265 fn into_result(
6266 self,
6267 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6268 match self.err {
6269 Some(err) => Err(err),
6270 None => {
6271 let mut seen = BTreeSet::new();
6274 let aggs = self
6275 .aggs
6276 .into_iter()
6277 .filter(move |agg| seen.insert(agg.clone()))
6278 .collect();
6279 Ok((aggs, self.tables))
6280 }
6281 }
6282 }
6283}
6284
6285impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6286 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6287 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6288 Ok(i) => i,
6289 Err(_) => return,
6291 };
6292
6293 match item.func() {
6294 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6297 if self.within_aggregate {
6298 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6299 return;
6300 }
6301 self.aggs.push(func.clone());
6302 let Function {
6303 name: _,
6304 args,
6305 filter,
6306 over: _,
6307 distinct: _,
6308 } = func;
6309 if let Some(filter) = filter {
6310 self.visit_expr_mut(filter);
6311 }
6312 let old_within_aggregate = self.within_aggregate;
6313 self.within_aggregate = true;
6314 self.table_disallowed_context
6315 .push("aggregate function calls");
6316
6317 self.visit_function_args_mut(args);
6318
6319 self.within_aggregate = old_within_aggregate;
6320 self.table_disallowed_context.pop();
6321 }
6322 Ok(Func::Table { .. }) => {
6323 self.table_disallowed_context.push("other table functions");
6324 visit_mut::visit_function_mut(self, func);
6325 self.table_disallowed_context.pop();
6326 }
6327 _ => visit_mut::visit_function_mut(self, func),
6328 }
6329 }
6330
6331 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6332 }
6334
6335 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6336 let (disallowed_context, func) = match expr {
6337 Expr::Case { .. } => (Some("CASE"), None),
6338 Expr::HomogenizingFunction {
6339 function: HomogenizingFunction::Coalesce,
6340 ..
6341 } => (Some("COALESCE"), None),
6342 Expr::Function(func) if self.in_select_item => {
6343 let mut table_func = None;
6346 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6347 if let Ok(Func::Table { .. }) = item.func() {
6348 if let Some(context) = self.table_disallowed_context.last() {
6349 self.err = Some(sql_err!(
6350 "table functions are not allowed in {} (function {})",
6351 context,
6352 func.name
6353 ));
6354 return;
6355 }
6356 table_func = Some(func.clone());
6357 }
6358 }
6359 (None, table_func)
6362 }
6363 _ => (None, None),
6364 };
6365 if let Some(func) = func {
6366 visit_mut::visit_expr_mut(self, expr);
6368 if let Function {
6370 name: _,
6371 args: _,
6372 filter: None,
6373 over: None,
6374 distinct: false,
6375 } = &func
6376 {
6377 let unique_id = self.id_gen.allocate_id();
6379 let id = self
6380 .tables
6381 .entry(func)
6382 .or_insert_with(|| format!("table_func_{unique_id}"));
6383 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6386 }
6387 }
6388 if let Some(context) = disallowed_context {
6389 self.table_disallowed_context.push(context);
6390 }
6391
6392 visit_mut::visit_expr_mut(self, expr);
6393
6394 if disallowed_context.is_some() {
6395 self.table_disallowed_context.pop();
6396 }
6397 }
6398
6399 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6400 let old = self.in_select_item;
6401 self.in_select_item = true;
6402 visit_mut::visit_select_item_mut(self, si);
6403 self.in_select_item = old;
6404 }
6405}
6406
6407#[derive(Default)]
6408struct WindowFuncCollector {
6409 window_funcs: Vec<Expr<Aug>>,
6410}
6411
6412impl WindowFuncCollector {
6413 fn into_result(self) -> Vec<Expr<Aug>> {
6414 let mut seen = BTreeSet::new();
6416 let window_funcs_dedupped = self
6417 .window_funcs
6418 .into_iter()
6419 .filter(move |expr| seen.insert(expr.clone()))
6420 .rev()
6423 .collect();
6424 window_funcs_dedupped
6425 }
6426}
6427
6428impl Visit<'_, Aug> for WindowFuncCollector {
6429 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6430 match expr {
6431 Expr::Function(func) => {
6432 if func.over.is_some() {
6433 self.window_funcs.push(expr.clone());
6434 }
6435 }
6436 _ => (),
6437 }
6438 visit::visit_expr(self, expr);
6439 }
6440
6441 fn visit_query(&mut self, _query: &Query<Aug>) {
6442 }
6444}
6445
6446#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6448pub enum QueryLifetime {
6449 OneShot,
6451 Index,
6453 MaterializedView,
6455 Subscribe,
6457 View,
6459 Source,
6461}
6462
6463impl QueryLifetime {
6464 pub fn is_one_shot(&self) -> bool {
6468 let result = match self {
6469 QueryLifetime::OneShot => true,
6470 QueryLifetime::Index => false,
6471 QueryLifetime::MaterializedView => false,
6472 QueryLifetime::Subscribe => false,
6473 QueryLifetime::View => false,
6474 QueryLifetime::Source => false,
6475 };
6476 assert_eq!(!result, self.is_maintained());
6477 result
6478 }
6479
6480 pub fn is_maintained(&self) -> bool {
6483 match self {
6484 QueryLifetime::OneShot => false,
6485 QueryLifetime::Index => true,
6486 QueryLifetime::MaterializedView => true,
6487 QueryLifetime::Subscribe => true,
6488 QueryLifetime::View => true,
6489 QueryLifetime::Source => true,
6490 }
6491 }
6492
6493 pub fn allow_show(&self) -> bool {
6495 match self {
6496 QueryLifetime::OneShot => true,
6497 QueryLifetime::Index => false,
6498 QueryLifetime::MaterializedView => false,
6499 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6501 QueryLifetime::Source => false,
6502 }
6503 }
6504}
6505
6506#[derive(Debug, Clone)]
6508pub struct CteDesc {
6509 pub name: String,
6510 pub desc: RelationDesc,
6511}
6512
6513#[derive(Debug, Clone)]
6515pub struct QueryContext<'a> {
6516 pub scx: &'a StatementContext<'a>,
6518 pub lifetime: QueryLifetime,
6520 pub outer_scopes: Vec<Scope>,
6522 pub outer_relation_types: Vec<SqlRelationType>,
6524 pub ctes: BTreeMap<LocalId, CteDesc>,
6526 pub name_manager: Rc<RefCell<NameManager>>,
6528 pub recursion_guard: RecursionGuard,
6529}
6530
6531impl CheckedRecursion for QueryContext<'_> {
6532 fn recursion_guard(&self) -> &RecursionGuard {
6533 &self.recursion_guard
6534 }
6535}
6536
6537impl<'a> QueryContext<'a> {
6538 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6539 QueryContext {
6540 scx,
6541 lifetime,
6542 outer_scopes: vec![],
6543 outer_relation_types: vec![],
6544 ctes: BTreeMap::new(),
6545 name_manager: Rc::new(RefCell::new(NameManager::new())),
6546 recursion_guard: RecursionGuard::with_limit(1024), }
6548 }
6549
6550 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6551 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6552 }
6553
6554 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6557 let ctes = self.ctes.clone();
6558 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6559 let outer_relation_types = iter::once(relation_type)
6560 .chain(self.outer_relation_types.clone())
6561 .collect();
6562 let name_manager = Rc::clone(&self.name_manager);
6564
6565 QueryContext {
6566 scx: self.scx,
6567 lifetime: self.lifetime,
6568 outer_scopes,
6569 outer_relation_types,
6570 ctes,
6571 name_manager,
6572 recursion_guard: self.recursion_guard.clone(),
6573 }
6574 }
6575
6576 fn empty_derived_context(&self) -> QueryContext<'a> {
6578 let scope = Scope::empty();
6579 let ty = SqlRelationType::empty();
6580 self.derived_context(scope, ty)
6581 }
6582
6583 pub fn resolve_table_name(
6586 &self,
6587 object: ResolvedItemName,
6588 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6589 match object {
6590 ResolvedItemName::Item {
6591 id,
6592 full_name,
6593 version,
6594 ..
6595 } => {
6596 let item = self.scx.get_item(&id).at_version(version);
6597 let desc = match item.relation_desc() {
6598 Some(desc) => desc.clone(),
6599 None => {
6600 return Err(PlanError::InvalidDependency {
6601 name: full_name.to_string(),
6602 item_type: item.item_type().to_string(),
6603 });
6604 }
6605 };
6606 let expr = HirRelationExpr::Get {
6607 id: Id::Global(item.global_id()),
6608 typ: desc.typ().clone(),
6609 };
6610
6611 let name = full_name.into();
6612 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6613
6614 Ok((expr, scope))
6615 }
6616 ResolvedItemName::Cte { id, name } => {
6617 let name = name.into();
6618 let cte = self.ctes.get(&id).unwrap();
6619 let expr = HirRelationExpr::Get {
6620 id: Id::Local(id),
6621 typ: cte.desc.typ().clone(),
6622 };
6623
6624 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6625
6626 Ok((expr, scope))
6627 }
6628 ResolvedItemName::ContinualTask { id, name } => {
6629 let cte = self.ctes.get(&id).unwrap();
6630 let expr = HirRelationExpr::Get {
6631 id: Id::Local(id),
6632 typ: cte.desc.typ().clone(),
6633 };
6634
6635 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6636
6637 Ok((expr, scope))
6638 }
6639 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6640 }
6641 }
6642
6643 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6646 self.scx.humanize_scalar_type(typ, postgres_compat)
6647 }
6648}
6649
6650#[derive(Debug, Clone)]
6652pub struct ExprContext<'a> {
6653 pub qcx: &'a QueryContext<'a>,
6654 pub name: &'a str,
6656 pub scope: &'a Scope,
6659 pub relation_type: &'a SqlRelationType,
6662 pub allow_aggregates: bool,
6664 pub allow_subqueries: bool,
6666 pub allow_parameters: bool,
6668 pub allow_windows: bool,
6670}
6671
6672impl CheckedRecursion for ExprContext<'_> {
6673 fn recursion_guard(&self) -> &RecursionGuard {
6674 &self.qcx.recursion_guard
6675 }
6676}
6677
6678impl<'a> ExprContext<'a> {
6679 pub fn catalog(&self) -> &dyn SessionCatalog {
6680 self.qcx.scx.catalog
6681 }
6682
6683 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6684 let mut ecx = self.clone();
6685 ecx.name = name;
6686 ecx
6687 }
6688
6689 pub fn column_type<E>(&self, expr: &E) -> E::Type
6690 where
6691 E: AbstractExpr,
6692 {
6693 expr.typ(
6694 &self.qcx.outer_relation_types,
6695 self.relation_type,
6696 &self.qcx.scx.param_types.borrow(),
6697 )
6698 }
6699
6700 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6701 where
6702 E: AbstractExpr,
6703 {
6704 self.column_type(expr).scalar_type()
6705 }
6706
6707 fn derived_query_context(&self) -> QueryContext<'_> {
6708 let mut scope = self.scope.clone();
6709 scope.lateral_barrier = true;
6710 self.qcx.derived_context(scope, self.relation_type.clone())
6711 }
6712
6713 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6714 self.qcx.scx.require_feature_flag(flag)
6715 }
6716
6717 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6718 &self.qcx.scx.param_types
6719 }
6720
6721 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6724 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6725 }
6726
6727 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6728 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6729 }
6730}
6731
6732#[derive(Debug, Clone)]
6738pub struct NameManager(BTreeSet<Arc<str>>);
6739
6740impl NameManager {
6741 pub fn new() -> Self {
6743 Self(BTreeSet::new())
6744 }
6745
6746 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6749 let s = s.as_ref();
6750 if let Some(interned) = self.0.get(s) {
6751 Arc::clone(interned)
6752 } else {
6753 let interned: Arc<str> = Arc::from(s);
6754 self.0.insert(Arc::clone(&interned));
6755 interned
6756 }
6757 }
6758
6759 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6762 self.intern(item.column_name.as_str())
6778 }
6779}
6780
6781#[cfg(test)]
6782mod test {
6783 use super::*;
6784
6785 #[mz_ore::test]
6790 pub fn test_name_manager_string_interning() {
6791 let mut nm = NameManager::new();
6792
6793 let orig_hi = "hi";
6794 let hi = nm.intern(orig_hi);
6795 let hello = nm.intern("hello");
6796
6797 assert_ne!(hi.as_ptr(), hello.as_ptr());
6798
6799 let hi2 = nm.intern("hi");
6801 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6802
6803 let s = format!(
6805 "{}{}",
6806 hi.chars().nth(0).unwrap(),
6807 hi2.chars().nth(1).unwrap()
6808 );
6809 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6811
6812 let hi3 = nm.intern(s);
6813 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6814 }
6815}