1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52 func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67 RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103 literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111 pub expr: E,
112 pub desc: RelationDesc,
113 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114 pub scope: Scope,
115}
116
117#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127 scx: &StatementContext,
128 mut query: Query<Aug>,
129 lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131 transform_ast::transform(scx, &mut query)?;
132 let mut qcx = QueryContext::root(scx, lifetime);
133 let PlannedQuery {
134 mut expr,
135 scope,
136 order_by,
137 limit,
138 offset,
139 project,
140 group_size_hints,
141 } = plan_query(&mut qcx, &query)?;
142
143 let mut finishing = RowSetFinishing {
144 limit,
145 offset,
146 project,
147 order_by,
148 };
149
150 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157 if lifetime.is_maintained() {
158 expr.finish_maintained(&mut finishing, group_size_hints);
159 }
160
161 let typ = qcx.relation_type(&expr);
162 let typ = SqlRelationType::new(
163 finishing
164 .project
165 .iter()
166 .map(|i| typ.column_types[*i].clone())
167 .collect(),
168 );
169 let desc = RelationDesc::new(typ, scope.column_names());
170
171 Ok(PlannedRootQuery {
172 expr,
173 desc,
174 finishing,
175 scope,
176 })
177}
178
179#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182 qcx: &mut QueryContext,
183 mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185 transform_ast::transform(qcx.scx, &mut query)?;
186 let PlannedQuery {
187 mut expr,
188 scope,
189 order_by,
190 limit,
191 offset,
192 project,
193 group_size_hints,
194 } = plan_query(qcx, &query)?;
195
196 let mut finishing = RowSetFinishing {
197 limit,
198 offset,
199 project,
200 order_by,
201 };
202
203 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210 expr.finish_maintained(&mut finishing, group_size_hints);
211
212 let typ = qcx.relation_type(&expr);
213 let typ = SqlRelationType::new(
214 finishing
215 .project
216 .iter()
217 .map(|i| typ.column_types[*i].clone())
218 .collect(),
219 );
220 let desc = RelationDesc::new(typ, scope.column_names());
221
222 Ok(PlannedRootQuery {
223 expr,
224 desc,
225 finishing,
226 scope,
227 })
228}
229
230fn try_push_projection_order_by(
241 expr: &mut HirRelationExpr,
242 project: &mut Vec<usize>,
243 order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245 let mut unproject = vec![None; expr.arity()];
246 for (out_i, in_i) in project.iter().copied().enumerate() {
247 unproject[in_i] = Some(out_i);
248 }
249 if order_by
250 .iter()
251 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252 {
253 let trivial_project = (0..project.len()).collect();
254 *expr = expr.take().project(mem::replace(project, trivial_project));
255 for ob in order_by {
256 ob.column = unproject[ob.column].unwrap();
257 }
258 true
259 } else {
260 false
261 }
262}
263
264pub fn plan_insert_query(
265 scx: &StatementContext,
266 table_name: ResolvedItemName,
267 columns: Vec<Ident>,
268 source: InsertSource<Aug>,
269 returning: Vec<SelectItem<Aug>>,
270) -> Result<
271 (
272 CatalogItemId,
273 HirRelationExpr,
274 PlannedRootQuery<Vec<HirScalarExpr>>,
275 ),
276 PlanError,
277> {
278 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279 let table = scx.get_item_by_resolved_name(&table_name)?;
280
281 if table.item_type() != CatalogItemType::Table {
283 sql_bail!(
284 "cannot insert into {} '{}'",
285 table.item_type(),
286 table_name.full_name_str()
287 );
288 }
289 let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290 let mut defaults = table
291 .writable_table_details()
292 .ok_or_else(|| {
293 sql_err!(
294 "cannot insert into non-writeable table '{}'",
295 table_name.full_name_str()
296 )
297 })?
298 .to_vec();
299
300 for default in &mut defaults {
301 transform_ast::transform(scx, default)?;
302 }
303
304 if table.id().is_system() {
305 sql_bail!(
306 "cannot insert into system table '{}'",
307 table_name.full_name_str()
308 );
309 }
310
311 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313 let mut source_types = Vec::with_capacity(columns.len());
315 let mut ordering = Vec::with_capacity(columns.len());
316
317 if columns.is_empty() {
318 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321 ordering.extend(0..desc.arity());
322 } else {
323 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324 .iter()
325 .enumerate()
326 .map(|(idx, (name, typ))| (name, (idx, typ)))
327 .collect();
328
329 for c in &columns {
330 if let Some((idx, typ)) = column_by_name.get(c) {
331 ordering.push(*idx);
332 source_types.push(&typ.scalar_type);
333 } else {
334 sql_bail!(
335 "column {} of relation {} does not exist",
336 c.quoted(),
337 table_name.full_name_str().quoted()
338 );
339 }
340 }
341 if let Some(dup) = columns.iter().duplicates().next() {
342 sql_bail!("column {} specified more than once", dup.quoted());
343 }
344 };
345
346 let expr = match source {
348 InsertSource::Query(mut query) => {
349 transform_ast::transform(scx, &mut query)?;
350
351 match query {
352 Query {
354 body: SetExpr::Values(Values(values)),
355 ctes,
356 order_by,
357 limit: None,
358 offset: None,
359 } if ctes.is_empty() && order_by.is_empty() => {
360 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361 plan_values_insert(&qcx, &names, &source_types, &values)?
362 }
363 _ => {
364 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365 expr
366 }
367 }
368 }
369 InsertSource::DefaultValues => {
370 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371 }
372 };
373
374 let expr_arity = expr.arity();
375
376 let max_columns = if columns.is_empty() {
379 desc.arity()
380 } else {
381 columns.len()
382 };
383 if expr_arity > max_columns {
384 sql_bail!("INSERT has more expressions than target columns");
385 }
386 if expr_arity < columns.len() {
388 sql_bail!("INSERT has more target columns than expressions");
389 }
390
391 source_types.truncate(expr_arity);
393 ordering.truncate(expr_arity);
394
395 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398 sql_err!(
399 "column {} is of type {} but expression is of type {}",
400 desc.get_name(ordering[e.column]).quoted(),
401 qcx.humanize_scalar_type(&e.target_type, false),
402 qcx.humanize_scalar_type(&e.source_type, false),
403 )
404 })?;
405
406 let mut map_exprs = vec![];
408 let mut project_key = Vec::with_capacity(desc.arity());
409
410 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414 for (col_idx, (col_typ, default)) in column_details {
415 if let Some(src_idx) = col_to_source.get(&col_idx) {
416 project_key.push(*src_idx);
417 } else {
418 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419 project_key.push(expr_arity + map_exprs.len());
420 map_exprs.push(hir);
421 }
422 }
423
424 let returning = {
425 let (scope, typ) = if let ResolvedItemName::Item {
426 full_name,
427 version: _,
428 ..
429 } = table_name
430 {
431 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432 let typ = desc.typ().clone();
433 (scope, typ)
434 } else {
435 (Scope::empty(), SqlRelationType::empty())
436 };
437 let ecx = &ExprContext {
438 qcx: &qcx,
439 name: "RETURNING clause",
440 scope: &scope,
441 relation_type: &typ,
442 allow_aggregates: false,
443 allow_subqueries: false,
444 allow_parameters: true,
445 allow_windows: false,
446 };
447 let table_func_names = BTreeMap::new();
448 let mut output_columns = vec![];
449 let mut new_exprs = vec![];
450 let mut new_type = SqlRelationType::empty();
451 for mut si in returning {
452 transform_ast::transform(scx, &mut si)?;
453 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454 let expr = match &select_item {
455 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457 };
458 output_columns.push(column_name);
459 let typ = ecx.column_type(&expr);
460 new_type.column_types.push(typ);
461 new_exprs.push(expr);
462 }
463 }
464 let desc = RelationDesc::new(new_type, output_columns);
465 let desc_arity = desc.arity();
466 PlannedRootQuery {
467 expr: new_exprs,
468 desc,
469 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = item.desc(&fullname)?.into_owned();
507 let mut ordering = Vec::with_capacity(columns.len());
508
509 let mfp = if let Some(table_defaults) = item.writable_table_details() {
519 let mut table_defaults = table_defaults.to_vec();
520
521 for default in &mut table_defaults {
522 transform_ast::transform(scx, default)?;
523 }
524
525 let source_column_names: Vec<_> = columns
527 .iter()
528 .cloned()
529 .map(normalize::column_name)
530 .collect();
531
532 let mut default_exprs = Vec::new();
533 let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535 let column_details = table_desc.iter().zip_eq(table_defaults);
538 for ((col_name, col_type), col_default) in column_details {
539 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540 if let Some(src_idx) = maybe_src_idx {
541 project_keys.push(src_idx);
542 } else {
543 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546 let mir = hir.lower_uncorrelated()?;
547 project_keys.push(source_column_names.len() + default_exprs.len());
548 default_exprs.push(mir);
549 }
550 }
551
552 let mfp = MapFilterProject::new(source_column_names.len())
553 .map(default_exprs)
554 .project(project_keys);
555 Some(mfp)
556 } else {
557 None
558 };
559
560 let source_desc = if columns.is_empty() {
562 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563 ordering.extend(indexes);
564
565 table_desc
567 } else {
568 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570 .iter_all()
571 .map(|(idx, name, typ)| (name, (*idx, typ)))
572 .collect();
573
574 let mut names = Vec::with_capacity(columns.len());
575 let mut source_types = Vec::with_capacity(columns.len());
576
577 for c in &columns {
578 if let Some((idx, typ)) = column_by_name.get(c) {
579 ordering.push(*idx);
580 source_types.push((*typ).clone());
581 names.push(c.clone());
582 } else {
583 sql_bail!(
584 "column {} of relation {} does not exist",
585 c.quoted(),
586 item_name.full_name_str().quoted()
587 );
588 }
589 }
590 if let Some(dup) = columns.iter().duplicates().next() {
591 sql_bail!("column {} specified more than once", dup.quoted());
592 }
593
594 RelationDesc::new(SqlRelationType::new(source_types), names)
596 };
597
598 Ok((item.id(), source_desc, ordering, mfp))
599}
600
601pub fn plan_copy_from(
605 scx: &StatementContext,
606 table_name: ResolvedItemName,
607 columns: Vec<Ident>,
608) -> Result<
609 (
610 CatalogItemId,
611 RelationDesc,
612 Vec<ColumnIndex>,
613 Option<MapFilterProject>,
614 ),
615 PlanError,
616> {
617 let table = scx.get_item_by_resolved_name(&table_name)?;
618
619 if table.item_type() != CatalogItemType::Table {
621 sql_bail!(
622 "cannot insert into {} '{}'",
623 table.item_type(),
624 table_name.full_name_str()
625 );
626 }
627
628 let _ = table.writable_table_details().ok_or_else(|| {
629 sql_err!(
630 "cannot insert into non-writeable table '{}'",
631 table_name.full_name_str()
632 )
633 })?;
634
635 if table.id().is_system() {
636 sql_bail!(
637 "cannot insert into system table '{}'",
638 table_name.full_name_str()
639 );
640 }
641 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643 Ok((id, desc, ordering, mfp))
644}
645
646pub fn plan_copy_from_rows(
649 pcx: &PlanContext,
650 catalog: &dyn SessionCatalog,
651 target_id: CatalogItemId,
652 target_name: String,
653 columns: Vec<ColumnIndex>,
654 rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656 let scx = StatementContext::new(Some(pcx), catalog);
657
658 let table = catalog
660 .try_get_item(&target_id)
661 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662 .at_version(RelationVersionSelector::Latest);
663 let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665 let mut defaults = table
666 .writable_table_details()
667 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668 .to_vec();
669
670 for default in &mut defaults {
671 transform_ast::transform(&scx, default)?;
672 }
673
674 let column_types = columns
675 .iter()
676 .map(|x| desc.get_type(x).clone())
677 .map(|mut x| {
678 x.nullable = true;
681 x
682 })
683 .collect();
684 let typ = SqlRelationType::new(column_types);
685 let expr = HirRelationExpr::Constant {
686 rows,
687 typ: typ.clone(),
688 };
689
690 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696 if columns == default {
697 return Ok(expr);
698 }
699
700 let mut map_exprs = vec![];
702 let mut project_key = Vec::with_capacity(desc.arity());
703
704 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707 let column_details = desc.iter_all().zip_eq(defaults);
708 for ((col_idx, _col_name, col_typ), default) in column_details {
709 if let Some(src_idx) = col_to_source.get(&col_idx) {
710 project_key.push(*src_idx);
711 } else {
712 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713 project_key.push(typ.arity() + map_exprs.len());
714 map_exprs.push(hir);
715 }
716 }
717
718 Ok(expr.map(map_exprs).project(project_key))
719}
720
721pub struct ReadThenWritePlan {
723 pub id: CatalogItemId,
724 pub selection: HirRelationExpr,
729 pub assignments: BTreeMap<usize, HirScalarExpr>,
731 pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735 scx: &StatementContext,
736 mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738 transform_ast::transform(scx, &mut delete_stmt)?;
739
740 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741 plan_mutation_query_inner(
742 qcx,
743 delete_stmt.table_name,
744 delete_stmt.alias,
745 delete_stmt.using,
746 vec![],
747 delete_stmt.selection,
748 )
749}
750
751pub fn plan_update_query(
752 scx: &StatementContext,
753 mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755 transform_ast::transform(scx, &mut update_stmt)?;
756
757 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759 plan_mutation_query_inner(
760 qcx,
761 update_stmt.table_name,
762 update_stmt.alias,
763 vec![],
764 update_stmt.assignments,
765 update_stmt.selection,
766 )
767}
768
769pub fn plan_mutation_query_inner(
770 qcx: QueryContext,
771 table_name: ResolvedItemName,
772 alias: Option<TableAlias>,
773 using: Vec<TableWithJoins<Aug>>,
774 assignments: Vec<Assignment<Aug>>,
775 selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777 let (id, version) = match table_name {
779 ResolvedItemName::Item { id, version, .. } => (id, version),
780 _ => sql_bail!("cannot mutate non-user table"),
781 };
782
783 let item = qcx.scx.get_item(&id).at_version(version);
785 if item.item_type() != CatalogItemType::Table {
786 sql_bail!(
787 "cannot mutate {} '{}'",
788 item.item_type(),
789 table_name.full_name_str()
790 );
791 }
792 let _ = item.writable_table_details().ok_or_else(|| {
793 sql_err!(
794 "cannot mutate non-writeable table '{}'",
795 table_name.full_name_str()
796 )
797 })?;
798 if id.is_system() {
799 sql_bail!(
800 "cannot mutate system table '{}'",
801 table_name.full_name_str()
802 );
803 }
804
805 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807 let scope = plan_table_alias(scope, alias.as_ref())?;
808 let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809 let relation_type = qcx.relation_type(&get);
810
811 if using.is_empty() {
812 if let Some(expr) = selection {
813 let ecx = &ExprContext {
814 qcx: &qcx,
815 name: "WHERE clause",
816 scope: &scope,
817 relation_type: &relation_type,
818 allow_aggregates: false,
819 allow_subqueries: true,
820 allow_parameters: true,
821 allow_windows: false,
822 };
823 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824 get = get.filter(vec![expr]);
825 }
826 } else {
827 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828 }
829
830 let mut sets = BTreeMap::new();
831 for Assignment { id, value } in assignments {
832 let name = normalize::column_name(id);
834 match desc.get_by_name(&name) {
835 Some((idx, typ)) => {
836 let ecx = &ExprContext {
837 qcx: &qcx,
838 name: "SET clause",
839 scope: &scope,
840 relation_type: &relation_type,
841 allow_aggregates: false,
842 allow_subqueries: false,
843 allow_parameters: true,
844 allow_windows: false,
845 };
846 let expr = plan_expr(ecx, &value)?.cast_to(
847 ecx,
848 CastContext::Assignment,
849 &typ.scalar_type,
850 )?;
851
852 if sets.insert(idx, expr).is_some() {
853 sql_bail!("column {} set twice", name)
854 }
855 }
856 None => sql_bail!("unknown column {}", name),
857 };
858 }
859
860 let finishing = RowSetFinishing {
861 order_by: vec![],
862 limit: None,
863 offset: 0,
864 project: (0..desc.arity()).collect(),
865 };
866
867 Ok(ReadThenWritePlan {
868 id,
869 selection: get,
870 finishing,
871 assignments: sets,
872 })
873}
874
875fn handle_mutation_using_clause(
887 qcx: &QueryContext,
888 selection: Option<Expr<Aug>>,
889 using: Vec<TableWithJoins<Aug>>,
890 get: HirRelationExpr,
891 outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893 let (mut using_rel_expr, using_scope) =
897 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898 let (left, left_scope) = l;
899 plan_join(
900 qcx,
901 left,
902 left_scope,
903 &Join {
904 relation: TableFactor::NestedJoin {
905 join: Box::new(twj),
906 alias: None,
907 },
908 join_operator: JoinOperator::CrossJoin,
909 },
910 )
911 })?;
912
913 if let Some(expr) = selection {
914 let on = HirScalarExpr::literal_true();
920 let joined = using_rel_expr
921 .clone()
922 .join(get.clone(), on, JoinKind::Inner);
923 let joined_scope = using_scope.product(outer_scope)?;
924 let joined_relation_type = qcx.relation_type(&joined);
925
926 let ecx = &ExprContext {
927 qcx,
928 name: "WHERE clause",
929 scope: &joined_scope,
930 relation_type: &joined_relation_type,
931 allow_aggregates: false,
932 allow_subqueries: true,
933 allow_parameters: true,
934 allow_windows: false,
935 };
936
937 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944 use mz_expr::visit::Visit;
946 expr.visit_mut_post(&mut |e| {
947 if let HirScalarExpr::Column(c, _name) = e {
948 if c.column >= using_rel_arity {
949 c.level += 1;
950 c.column -= using_rel_arity;
951 };
952 }
953 })?;
954
955 using_rel_expr = using_rel_expr.filter(vec![expr]);
959 } else {
960 let _joined_scope = using_scope.product(outer_scope)?;
963 }
964 Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979 pub(crate) column: usize,
980 pub(crate) source_type: SqlScalarType,
981 pub(crate) target_type: SqlScalarType,
982}
983
984pub(crate) fn cast_relation<'a, I>(
988 qcx: &QueryContext,
989 ccx: CastContext,
990 expr: HirRelationExpr,
991 target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994 I: IntoIterator<Item = &'a SqlScalarType>,
995{
996 let ecx = &ExprContext {
997 qcx,
998 name: "values",
999 scope: &Scope::empty(),
1000 relation_type: &qcx.relation_type(&expr),
1001 allow_aggregates: false,
1002 allow_subqueries: true,
1003 allow_parameters: true,
1004 allow_windows: false,
1005 };
1006 let mut map_exprs = vec![];
1007 let mut project_key = vec![];
1008 for (i, target_typ) in target_types.into_iter().enumerate() {
1009 let expr = HirScalarExpr::column(i);
1010 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014 Ok(cast_expr) => {
1015 if expr == cast_expr {
1016 project_key.push(i);
1018 } else {
1019 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021 map_exprs.push(cast_expr);
1022 }
1023 }
1024 Err(_) => {
1025 return Err(CastRelationError {
1026 column: i,
1027 source_type: ecx.scalar_type(&expr),
1028 target_type: target_typ.clone(),
1029 });
1030 }
1031 }
1032 }
1033 Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036pub fn plan_as_of(
1039 scx: &StatementContext,
1040 as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042 match as_of {
1043 None => Ok(QueryWhen::Immediately),
1044 Some(as_of) => match as_of {
1045 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047 },
1048 }
1049}
1050
1051pub fn plan_as_of_or_up_to(
1061 scx: &StatementContext,
1062 mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064 let scope = Scope::empty();
1065 let desc = RelationDesc::empty();
1066 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069 transform_ast::transform(scx, &mut expr)?;
1070 let ecx = &ExprContext {
1071 qcx: &qcx,
1072 name: "AS OF or UP TO",
1073 scope: &scope,
1074 relation_type: desc.typ(),
1075 allow_aggregates: false,
1076 allow_subqueries: false,
1077 allow_parameters: false,
1078 allow_windows: false,
1079 };
1080 let hir = plan_expr(ecx, &expr)?.cast_to(
1081 ecx,
1082 CastContext::Assignment,
1083 &SqlScalarType::MzTimestamp,
1084 )?;
1085 if hir.contains_unmaterializable() {
1086 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087 }
1088 let timestamp = hir
1095 .into_literal_mz_timestamp()
1096 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097 Ok(timestamp)
1098}
1099
1100pub fn plan_secret_as(
1102 scx: &StatementContext,
1103 mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105 let scope = Scope::empty();
1106 let desc = RelationDesc::empty();
1107 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109 transform_ast::transform(scx, &mut expr)?;
1110
1111 let ecx = &ExprContext {
1112 qcx: &qcx,
1113 name: "AS",
1114 scope: &scope,
1115 relation_type: desc.typ(),
1116 allow_aggregates: false,
1117 allow_subqueries: false,
1118 allow_parameters: false,
1119 allow_windows: false,
1120 };
1121 let expr = plan_expr(ecx, &expr)?
1122 .type_as(ecx, &SqlScalarType::Bytes)?
1123 .lower_uncorrelated()?;
1124 Ok(expr)
1125}
1126
1127pub fn plan_webhook_validate_using(
1129 scx: &StatementContext,
1130 validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134 let CreateWebhookSourceCheck {
1135 options,
1136 using: mut expr,
1137 } = validate_using;
1138
1139 let mut column_typs = vec![];
1140 let mut column_names = vec![];
1141
1142 let (bodies, headers, secrets) = options
1143 .map(|o| (o.bodies, o.headers, o.secrets))
1144 .unwrap_or_default();
1145
1146 let mut body_tuples = vec![];
1148 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149 let scalar_type = use_bytes
1150 .then_some(SqlScalarType::Bytes)
1151 .unwrap_or(SqlScalarType::String);
1152 let name = alias
1153 .map(|a| a.into_string())
1154 .unwrap_or_else(|| "body".to_string());
1155
1156 column_typs.push(SqlColumnType {
1157 scalar_type,
1158 nullable: false,
1159 });
1160 column_names.push(name);
1161
1162 let column_idx = column_typs.len() - 1;
1164 assert_eq!(
1166 column_idx,
1167 column_names.len() - 1,
1168 "body column names and types don't match"
1169 );
1170 body_tuples.push((column_idx, use_bytes));
1171 }
1172
1173 let mut header_tuples = vec![];
1175
1176 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177 let value_type = use_bytes
1178 .then_some(SqlScalarType::Bytes)
1179 .unwrap_or(SqlScalarType::String);
1180 let name = alias
1181 .map(|a| a.into_string())
1182 .unwrap_or_else(|| "headers".to_string());
1183
1184 column_typs.push(SqlColumnType {
1185 scalar_type: SqlScalarType::Map {
1186 value_type: Box::new(value_type),
1187 custom_id: None,
1188 },
1189 nullable: false,
1190 });
1191 column_names.push(name);
1192
1193 let column_idx = column_typs.len() - 1;
1195 assert_eq!(
1197 column_idx,
1198 column_names.len() - 1,
1199 "header column names and types don't match"
1200 );
1201 header_tuples.push((column_idx, use_bytes));
1202 }
1203
1204 let mut validation_secrets = vec![];
1206
1207 for CreateWebhookSourceSecret {
1208 secret,
1209 alias,
1210 use_bytes,
1211 } in secrets
1212 {
1213 let scalar_type = use_bytes
1215 .then_some(SqlScalarType::Bytes)
1216 .unwrap_or(SqlScalarType::String);
1217
1218 column_typs.push(SqlColumnType {
1219 scalar_type,
1220 nullable: false,
1221 });
1222 let ResolvedItemName::Item {
1223 id,
1224 full_name: FullItemName { item, .. },
1225 ..
1226 } = secret
1227 else {
1228 return Err(PlanError::InvalidSecret(Box::new(secret)));
1229 };
1230
1231 let name = if let Some(alias) = alias {
1233 alias.into_string()
1234 } else {
1235 item
1236 };
1237 column_names.push(name);
1238
1239 let column_idx = column_typs.len() - 1;
1242 assert_eq!(
1244 column_idx,
1245 column_names.len() - 1,
1246 "column names and types don't match"
1247 );
1248
1249 validation_secrets.push(WebhookValidationSecret {
1250 id,
1251 column_idx,
1252 use_bytes,
1253 });
1254 }
1255
1256 let relation_typ = SqlRelationType::new(column_typs);
1257 let desc = RelationDesc::new(relation_typ, column_names.clone());
1258 let scope = Scope::from_source(None, column_names);
1259
1260 transform_ast::transform(scx, &mut expr)?;
1261
1262 let ecx = &ExprContext {
1263 qcx: &qcx,
1264 name: "CHECK",
1265 scope: &scope,
1266 relation_type: desc.typ(),
1267 allow_aggregates: false,
1268 allow_subqueries: false,
1269 allow_parameters: false,
1270 allow_windows: false,
1271 };
1272 let expr = plan_expr(ecx, &expr)?
1273 .type_as(ecx, &SqlScalarType::Bool)?
1274 .lower_uncorrelated()?;
1275 let validation = WebhookValidation {
1276 expression: expr,
1277 relation_desc: desc,
1278 bodies: body_tuples,
1279 headers: header_tuples,
1280 secrets: validation_secrets,
1281 };
1282 Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286 scx: &StatementContext,
1287 expr: &Expr<Aug>,
1288 target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291 let ecx = &ExprContext {
1292 qcx: &qcx,
1293 name: "DEFAULT expression",
1294 scope: &Scope::empty(),
1295 relation_type: &SqlRelationType::empty(),
1296 allow_aggregates: false,
1297 allow_subqueries: false,
1298 allow_parameters: false,
1299 allow_windows: false,
1300 };
1301 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302 Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306 scx: &'a StatementContext,
1307 params: Vec<Expr<Aug>>,
1308 desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310 if params.len() != desc.param_types.len() {
1311 sql_bail!(
1312 "expected {} params, got {}",
1313 desc.param_types.len(),
1314 params.len()
1315 );
1316 }
1317
1318 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320 let mut datums = Row::default();
1321 let mut packer = datums.packer();
1322 let mut actual_types = Vec::new();
1323 let temp_storage = &RowArena::new();
1324 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325 transform_ast::transform(scx, &mut expr)?;
1326
1327 let ecx = execute_expr_context(&qcx);
1328 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329 let actual_ty = ecx.scalar_type(&ex);
1330 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331 return Err(PlanError::WrongParameterType(
1332 i + 1,
1333 ecx.humanize_scalar_type(expected_ty, false),
1334 ecx.humanize_scalar_type(&actual_ty, false),
1335 ));
1336 }
1337 let ex = ex.lower_uncorrelated()?;
1338 let evaled = ex.eval(&[], temp_storage)?;
1339 packer.push(evaled);
1340 actual_types.push(actual_ty);
1341 }
1342 Ok(Params {
1343 datums,
1344 execute_types: actual_types,
1345 expected_types: desc.param_types.clone(),
1346 })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354 ExprContext {
1355 qcx,
1356 name: "EXECUTE",
1357 scope: &EXECUTE_CONTEXT_SCOPE,
1358 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359 allow_aggregates: false,
1360 allow_subqueries: false,
1361 allow_parameters: false,
1362 allow_windows: false,
1363 }
1364}
1365
1366pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371 LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374 scx: &'a StatementContext,
1375 on_desc: &RelationDesc,
1376 exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378 let scope = Scope::from_source(None, on_desc.iter_names());
1379 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381 let ecx = &ExprContext {
1382 qcx: &qcx,
1383 name: "CREATE INDEX",
1384 scope: &scope,
1385 relation_type: on_desc.typ(),
1386 allow_aggregates: false,
1387 allow_subqueries: false,
1388 allow_parameters: false,
1389 allow_windows: false,
1390 };
1391 let mut out = vec![];
1392 for mut expr in exprs {
1393 transform_ast::transform(scx, &mut expr)?;
1394 let expr = plan_expr_or_col_index(ecx, &expr)?;
1395 let mut expr = expr.lower_uncorrelated()?;
1396 expr.reduce(&on_desc.typ().column_types);
1397 out.push(expr);
1398 }
1399 Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404 Some(column) => Ok(HirScalarExpr::column(column)),
1405 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406 }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410 match e {
1411 Expr::Value(Value::Number(n)) => {
1412 let n = n.parse::<usize>().map_err(|e| {
1413 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414 })?;
1415 if n < 1 || n > max {
1416 sql_bail!(
1417 "column reference {} in {} is out of range (1 - {})",
1418 n,
1419 name,
1420 max
1421 );
1422 }
1423 Ok(Some(n - 1))
1424 }
1425 _ => Ok(None),
1426 }
1427}
1428
1429struct PlannedQuery {
1430 expr: HirRelationExpr,
1431 scope: Scope,
1432 order_by: Vec<ColumnOrder>,
1433 limit: Option<HirScalarExpr>,
1434 offset: HirScalarExpr,
1440 project: Vec<usize>,
1441 group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449 let cte_bindings = plan_ctes(qcx, q)?;
1452
1453 let limit = match &q.limit {
1454 None => None,
1455 Some(Limit {
1456 quantity,
1457 with_ties: false,
1458 }) => {
1459 let ecx = &ExprContext {
1460 qcx,
1461 name: "LIMIT",
1462 scope: &Scope::empty(),
1463 relation_type: &SqlRelationType::empty(),
1464 allow_aggregates: false,
1465 allow_subqueries: true,
1466 allow_parameters: true,
1467 allow_windows: false,
1468 };
1469 let limit = plan_expr(ecx, quantity)?;
1470 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472 let limit = if limit.is_constant() {
1473 let arena = RowArena::new();
1474 let limit = limit.lower_uncorrelated()?;
1475
1476 match limit.eval(&[], &arena)? {
1480 d @ Datum::Int64(v) if v >= 0 => {
1481 HirScalarExpr::literal(d, SqlScalarType::Int64)
1482 }
1483 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486 }
1487 } else {
1488 qcx.scx
1490 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491 limit
1492 };
1493
1494 Some(limit)
1495 }
1496 Some(Limit {
1497 quantity: _,
1498 with_ties: true,
1499 }) => bail_unsupported!("FETCH ... WITH TIES"),
1500 };
1501
1502 let offset = match &q.offset {
1503 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504 Some(offset) => {
1505 let ecx = &ExprContext {
1506 qcx,
1507 name: "OFFSET",
1508 scope: &Scope::empty(),
1509 relation_type: &SqlRelationType::empty(),
1510 allow_aggregates: false,
1511 allow_subqueries: false,
1512 allow_parameters: true,
1513 allow_windows: false,
1514 };
1515 let offset = plan_expr(ecx, offset)?;
1516 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518 let offset = if offset.is_constant() {
1519 let offset_value = offset_into_value(offset)?;
1521 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522 } else {
1523 if !offset.contains_parameters() {
1527 return Err(PlanError::InvalidOffset(format!(
1528 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529 offset
1530 )));
1531 }
1532 offset
1533 };
1534 offset
1535 }
1536 };
1537
1538 let mut planned_query = match &q.body {
1539 SetExpr::Select(s) => {
1540 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545 PlannedQuery {
1546 expr: plan.expr,
1547 scope: plan.scope,
1548 order_by: plan.order_by,
1549 project: plan.project,
1550 limit,
1551 offset,
1552 group_size_hints,
1553 }
1554 }
1555 _ => {
1556 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557 let ecx = &ExprContext {
1558 qcx,
1559 name: "ORDER BY clause of a set expression",
1560 scope: &scope,
1561 relation_type: &qcx.relation_type(&expr),
1562 allow_aggregates: false,
1563 allow_subqueries: true,
1564 allow_parameters: true,
1565 allow_windows: false,
1566 };
1567 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569 let project = (0..ecx.relation_type.arity()).collect();
1570 PlannedQuery {
1571 expr: expr.map(map_exprs),
1572 scope,
1573 order_by,
1574 limit,
1575 project,
1576 offset,
1577 group_size_hints: GroupSizeHints::default(),
1578 }
1579 }
1580 };
1581
1582 match &q.ctes {
1584 CteBlock::Simple(_) => {
1585 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586 if let Some(cte) = qcx.ctes.remove(&id) {
1587 planned_query.expr = HirRelationExpr::Let {
1588 name: cte.name,
1589 id: id.clone(),
1590 value: Box::new(value),
1591 body: Box::new(planned_query.expr),
1592 };
1593 }
1594 if let Some(shadowed_val) = shadowed_val {
1595 qcx.ctes.insert(id, shadowed_val);
1596 }
1597 }
1598 }
1599 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600 let MutRecBlockOptionExtracted {
1601 recursion_limit,
1602 return_at_recursion_limit,
1603 error_at_recursion_limit,
1604 seen: _,
1605 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606 let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607 (None, None, None) => None,
1608 (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609 (None, Some(max_iters), None) => Some((max_iters, true)),
1610 (None, None, Some(max_iters)) => Some((max_iters, false)),
1611 _ => {
1612 return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613 }
1614 }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616 return_at_limit: *return_at_limit,
1617 }))?;
1618
1619 let mut bindings = Vec::new();
1620 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621 if let Some(cte) = qcx.ctes.remove(&id) {
1622 bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1623 }
1624 if let Some(shadowed_val) = shadowed_val {
1625 qcx.ctes.insert(id, shadowed_val);
1626 }
1627 }
1628 if !bindings.is_empty() {
1629 planned_query.expr = HirRelationExpr::LetRec {
1630 limit,
1631 bindings,
1632 body: Box::new(planned_query.expr),
1633 }
1634 }
1635 }
1636 }
1637
1638 Ok(planned_query)
1639}
1640
1641pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643 let offset = offset
1644 .try_into_literal_int64()
1645 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646 if offset < 0 {
1647 return Err(PlanError::InvalidOffset(format!(
1648 "must not be negative, got {}",
1649 offset
1650 )));
1651 }
1652 Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656 MutRecBlockOption,
1657 (RecursionLimit, u64),
1658 (ReturnAtRecursionLimit, u64),
1659 (ErrorAtRecursionLimit, u64)
1660);
1661
1662pub fn plan_ctes(
1667 qcx: &mut QueryContext,
1668 q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670 let mut result = Vec::new();
1672 let mut shadowed_descs = BTreeMap::new();
1675
1676 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678 sql_bail!(
1679 "WITH query name {} specified more than once",
1680 normalize::ident_ref(ident).quoted()
1681 )
1682 }
1683
1684 match &q.ctes {
1685 CteBlock::Simple(ctes) => {
1686 for cte in ctes.iter() {
1688 let cte_name = normalize::ident(cte.alias.name.clone());
1689 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690 let typ = qcx.relation_type(&val);
1691 let mut desc = RelationDesc::new(typ, scope.column_names());
1692 plan_utils::maybe_rename_columns(
1693 format!("CTE {}", cte.alias.name),
1694 &mut desc,
1695 &cte.alias.columns,
1696 )?;
1697 let shadowed = qcx.ctes.insert(
1699 cte.id,
1700 CteDesc {
1701 name: cte_name,
1702 desc,
1703 },
1704 );
1705
1706 result.push((cte.id, val, shadowed));
1707 }
1708 }
1709 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710 for cte in ctes.iter() {
1712 let cte_name = normalize::ident(cte.name.clone());
1713 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714 for column in cte.columns.iter() {
1715 desc_columns.push((
1716 normalize::column_name(column.name.clone()),
1717 SqlColumnType {
1718 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719 nullable: true,
1720 },
1721 ));
1722 }
1723 let desc = RelationDesc::from_names_and_types(desc_columns);
1724 let shadowed = qcx.ctes.insert(
1725 cte.id,
1726 CteDesc {
1727 name: cte_name,
1728 desc,
1729 },
1730 );
1731 if let Some(shadowed) = shadowed {
1733 shadowed_descs.insert(cte.id, shadowed);
1734 }
1735 }
1736
1737 for cte in ctes.iter() {
1739 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744 sql_bail!(
1747 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748 );
1749 }
1750
1751 if !proposed_typ.keys.is_empty() {
1752 sql_bail!("[internal error]: WMR CTEs do not support keys");
1755 }
1756
1757 let derived_typ = qcx.relation_type(&val);
1759
1760 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761 let cte_name = normalize::ident(cte.name.clone());
1762 let proposed_typ = proposed_typ
1763 .column_types
1764 .iter()
1765 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766 .collect::<Vec<_>>();
1767 let inferred_typ = derived_typ
1768 .column_types
1769 .iter()
1770 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771 .collect::<Vec<_>>();
1772 Err(PlanError::RecursiveTypeMismatch(
1773 cte_name,
1774 proposed_typ,
1775 inferred_typ,
1776 ))
1777 };
1778
1779 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780 return type_err(proposed_typ, derived_typ);
1781 }
1782
1783 let val = match cast_relation(
1785 qcx,
1786 CastContext::Assignment,
1791 val,
1792 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793 ) {
1794 Ok(val) => val,
1795 Err(_) => return type_err(proposed_typ, derived_typ),
1796 };
1797
1798 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799 }
1800 }
1801 }
1802
1803 Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807 qcx: &mut QueryContext,
1808 q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810 let PlannedQuery {
1811 mut expr,
1812 scope,
1813 order_by,
1814 limit,
1815 offset,
1816 project,
1817 group_size_hints,
1818 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819 if limit.is_some()
1820 || !offset
1821 .clone()
1822 .try_into_literal_int64()
1823 .is_ok_and(|offset| offset == 0)
1824 {
1825 expr = HirRelationExpr::top_k(
1826 expr,
1827 vec![],
1828 order_by,
1829 limit,
1830 offset,
1831 group_size_hints.limit_input_group_size,
1832 );
1833 }
1834 Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838 qcx: &mut QueryContext,
1839 q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841 match q {
1842 SetExpr::Select(select) => {
1843 let order_by_exprs = Vec::new();
1844 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845 assert!(plan.order_by.is_empty());
1848 Ok((plan.expr.project(plan.project), plan.scope))
1849 }
1850 SetExpr::SetOperation {
1851 op,
1852 all,
1853 left,
1854 right,
1855 } => {
1856 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858 let (right_expr, right_scope) =
1859 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861 let left_type = qcx.relation_type(&left_expr);
1863 let right_type = qcx.relation_type(&right_expr);
1864 if left_type.arity() != right_type.arity() {
1865 sql_bail!(
1866 "each {} query must have the same number of columns: {} vs {}",
1867 op,
1868 left_type.arity(),
1869 right_type.arity(),
1870 );
1871 }
1872
1873 let left_ecx = &ExprContext {
1878 qcx,
1879 name: &op.to_string(),
1880 scope: &left_scope,
1881 relation_type: &left_type,
1882 allow_aggregates: false,
1883 allow_subqueries: false,
1884 allow_parameters: false,
1885 allow_windows: false,
1886 };
1887 let right_ecx = &ExprContext {
1888 qcx,
1889 name: &op.to_string(),
1890 scope: &right_scope,
1891 relation_type: &right_type,
1892 allow_aggregates: false,
1893 allow_subqueries: false,
1894 allow_parameters: false,
1895 allow_windows: false,
1896 };
1897 let mut left_casts = vec![];
1898 let mut right_casts = vec![];
1899 for (i, (left_type, right_type)) in left_type
1900 .column_types
1901 .iter()
1902 .zip_eq(right_type.column_types.iter())
1903 .enumerate()
1904 {
1905 let types = &[
1906 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908 ];
1909 let target =
1910 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911 match typeconv::plan_cast(
1912 left_ecx,
1913 CastContext::Implicit,
1914 HirScalarExpr::column(i),
1915 &target,
1916 ) {
1917 Ok(expr) => left_casts.push(expr),
1918 Err(_) => sql_bail!(
1919 "{} types {} and {} cannot be matched",
1920 op,
1921 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922 qcx.humanize_scalar_type(&target, false),
1923 ),
1924 }
1925 match typeconv::plan_cast(
1926 right_ecx,
1927 CastContext::Implicit,
1928 HirScalarExpr::column(i),
1929 &target,
1930 ) {
1931 Ok(expr) => right_casts.push(expr),
1932 Err(_) => sql_bail!(
1933 "{} types {} and {} cannot be matched",
1934 op,
1935 qcx.humanize_scalar_type(&target, false),
1936 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937 ),
1938 }
1939 }
1940 let lhs = if left_casts
1941 .iter()
1942 .enumerate()
1943 .any(|(i, e)| e != &HirScalarExpr::column(i))
1944 {
1945 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946 left_expr.map(left_casts).project(project_key)
1947 } else {
1948 left_expr
1949 };
1950 let rhs = if right_casts
1951 .iter()
1952 .enumerate()
1953 .any(|(i, e)| e != &HirScalarExpr::column(i))
1954 {
1955 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956 right_expr.map(right_casts).project(project_key)
1957 } else {
1958 right_expr
1959 };
1960
1961 let relation_expr = match op {
1962 SetOperator::Union => {
1963 if *all {
1964 lhs.union(rhs)
1965 } else {
1966 lhs.union(rhs).distinct()
1967 }
1968 }
1969 SetOperator::Except => Hir::except(all, lhs, rhs),
1970 SetOperator::Intersect => {
1971 let left_clone = lhs.clone();
1977 if *all {
1978 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979 } else {
1980 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981 .distinct()
1982 }
1983 }
1984 };
1985 let scope = Scope::from_source(
1986 None,
1987 left_scope.column_names(),
1989 );
1990
1991 Ok((relation_expr, scope))
1992 }
1993 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994 SetExpr::Table(name) => {
1995 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996 Ok((expr, scope))
1997 }
1998 SetExpr::Query(query) => {
1999 let (expr, scope) = plan_nested_query(qcx, query)?;
2000 Ok((expr, scope))
2001 }
2002 SetExpr::Show(stmt) => {
2003 if !qcx.lifetime.allow_show() {
2017 return Err(PlanError::ShowCommandInView);
2018 }
2019
2020 fn to_hirscope(
2023 plan: ShowCreatePlan,
2024 desc: StatementDesc,
2025 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027 let desc = desc.relation_desc.expect("must exist");
2028 let expr = HirRelationExpr::constant(rows, desc.typ().clone());
2029 let scope = Scope::from_source(None, desc.iter_names());
2030 Ok((expr, scope))
2031 }
2032
2033 match stmt.clone() {
2034 ShowStatement::ShowColumns(stmt) => {
2035 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036 }
2037 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039 show::describe_show_create_connection(qcx.scx, stmt)?,
2040 ),
2041 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043 show::describe_show_create_cluster(qcx.scx, stmt)?,
2044 ),
2045 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_index(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_sink(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_source(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_table(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_view(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068 ),
2069 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2070 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2071 show::describe_show_create_type(qcx.scx, stmt)?,
2072 ),
2073 ShowStatement::ShowObjects(stmt) => {
2074 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2075 }
2076 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2077 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2078 }
2079 }
2080 }
2081}
2082
2083fn plan_values(
2085 qcx: &QueryContext,
2086 values: &[Vec<Expr<Aug>>],
2087) -> Result<(HirRelationExpr, Scope), PlanError> {
2088 assert!(!values.is_empty());
2089
2090 let ecx = &ExprContext {
2091 qcx,
2092 name: "VALUES",
2093 scope: &Scope::empty(),
2094 relation_type: &SqlRelationType::empty(),
2095 allow_aggregates: false,
2096 allow_subqueries: true,
2097 allow_parameters: true,
2098 allow_windows: false,
2099 };
2100
2101 let ncols = values[0].len();
2102 let nrows = values.len();
2103
2104 let mut cols = vec![vec![]; ncols];
2107 for row in values {
2108 if row.len() != ncols {
2109 sql_bail!(
2110 "VALUES expression has varying number of columns: {} vs {}",
2111 row.len(),
2112 ncols
2113 );
2114 }
2115 for (i, v) in row.iter().enumerate() {
2116 cols[i].push(v);
2117 }
2118 }
2119
2120 let mut col_iters = Vec::with_capacity(ncols);
2122 let mut col_types = Vec::with_capacity(ncols);
2123 for col in &cols {
2124 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2125 let mut col_type = ecx.column_type(&col[0]);
2126 for val in &col[1..] {
2127 col_type = col_type.union(&ecx.column_type(val))?;
2128 }
2129 col_types.push(col_type);
2130 col_iters.push(col.into_iter());
2131 }
2132
2133 let mut exprs = vec![];
2135 for _ in 0..nrows {
2136 for i in 0..ncols {
2137 exprs.push(col_iters[i].next().unwrap());
2138 }
2139 }
2140 let out = HirRelationExpr::CallTable {
2141 func: TableFunc::Wrap {
2142 width: ncols,
2143 types: col_types,
2144 },
2145 exprs,
2146 };
2147
2148 let mut scope = Scope::empty();
2150 for i in 0..ncols {
2151 let name = format!("column{}", i + 1);
2152 scope.items.push(ScopeItem::from_column_name(name));
2153 }
2154
2155 Ok((out, scope))
2156}
2157
2158fn plan_values_insert(
2168 qcx: &QueryContext,
2169 target_names: &[&ColumnName],
2170 target_types: &[&SqlScalarType],
2171 values: &[Vec<Expr<Aug>>],
2172) -> Result<HirRelationExpr, PlanError> {
2173 assert!(!values.is_empty());
2174
2175 if !values.iter().map(|row| row.len()).all_equal() {
2176 sql_bail!("VALUES lists must all be the same length");
2177 }
2178
2179 let ecx = &ExprContext {
2180 qcx,
2181 name: "VALUES",
2182 scope: &Scope::empty(),
2183 relation_type: &SqlRelationType::empty(),
2184 allow_aggregates: false,
2185 allow_subqueries: true,
2186 allow_parameters: true,
2187 allow_windows: false,
2188 };
2189
2190 let mut exprs = vec![];
2191 let mut types = vec![];
2192 for row in values {
2193 if row.len() > target_names.len() {
2194 sql_bail!("INSERT has more expressions than target columns");
2195 }
2196 for (column, val) in row.into_iter().enumerate() {
2197 let target_type = &target_types[column];
2198 let val = plan_expr(ecx, val)?;
2199 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2200 let source_type = &ecx.scalar_type(&val);
2201 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2202 Ok(val) => val,
2203 Err(_) => sql_bail!(
2204 "column {} is of type {} but expression is of type {}",
2205 target_names[column].quoted(),
2206 qcx.humanize_scalar_type(target_type, false),
2207 qcx.humanize_scalar_type(source_type, false),
2208 ),
2209 };
2210 if column >= types.len() {
2211 types.push(ecx.column_type(&val));
2212 } else {
2213 types[column] = types[column].union(&ecx.column_type(&val))?;
2214 }
2215 exprs.push(val);
2216 }
2217 }
2218
2219 Ok(HirRelationExpr::CallTable {
2220 func: TableFunc::Wrap {
2221 width: values[0].len(),
2222 types,
2223 },
2224 exprs,
2225 })
2226}
2227
2228fn plan_join_identity() -> (HirRelationExpr, Scope) {
2229 let typ = SqlRelationType::new(vec![]);
2230 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2231 let scope = Scope::empty();
2232 (expr, scope)
2233}
2234
2235#[derive(Debug)]
2241struct SelectPlan {
2242 expr: HirRelationExpr,
2243 scope: Scope,
2244 order_by: Vec<ColumnOrder>,
2245 project: Vec<usize>,
2246}
2247
2248generate_extracted_config!(
2249 SelectOption,
2250 (ExpectedGroupSize, u64),
2251 (AggregateInputGroupSize, u64),
2252 (DistinctOnInputGroupSize, u64),
2253 (LimitInputGroupSize, u64)
2254);
2255
2256fn plan_select_from_where(
2274 qcx: &QueryContext,
2275 mut s: Select<Aug>,
2276 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2277) -> Result<SelectPlan, PlanError> {
2278 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2285 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2286
2287 let (mut relation_expr, mut from_scope) =
2289 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2290 let (left, left_scope) = l;
2291 plan_join(
2292 qcx,
2293 left,
2294 left_scope,
2295 &Join {
2296 relation: TableFactor::NestedJoin {
2297 join: Box::new(twj.clone()),
2298 alias: None,
2299 },
2300 join_operator: JoinOperator::CrossJoin,
2301 },
2302 )
2303 })?;
2304
2305 if let Some(selection) = &s.selection {
2307 let ecx = &ExprContext {
2308 qcx,
2309 name: "WHERE clause",
2310 scope: &from_scope,
2311 relation_type: &qcx.relation_type(&relation_expr),
2312 allow_aggregates: false,
2313 allow_subqueries: true,
2314 allow_parameters: true,
2315 allow_windows: false,
2316 };
2317 let expr = plan_expr(ecx, selection)
2318 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2319 .type_as(ecx, &SqlScalarType::Bool)?;
2320 relation_expr = relation_expr.filter(vec![expr]);
2321 }
2322
2323 let (aggregates, table_funcs) = {
2326 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2327 visitor.visit_select_mut(&mut s);
2328 for o in order_by_exprs.iter_mut() {
2329 visitor.visit_order_by_expr_mut(o);
2330 }
2331 visitor.into_result()?
2332 };
2333 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2334 if !table_funcs.is_empty() {
2335 let (expr, scope) = plan_scalar_table_funcs(
2336 qcx,
2337 table_funcs,
2338 &mut table_func_names,
2339 &relation_expr,
2340 &from_scope,
2341 )?;
2342 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2343 from_scope = from_scope.product(scope)?;
2344 }
2345
2346 let projection = {
2348 let ecx = &ExprContext {
2349 qcx,
2350 name: "SELECT clause",
2351 scope: &from_scope,
2352 relation_type: &qcx.relation_type(&relation_expr),
2353 allow_aggregates: true,
2354 allow_subqueries: true,
2355 allow_parameters: true,
2356 allow_windows: true,
2357 };
2358 let mut out = vec![];
2359 for si in &s.projection {
2360 if *si == SelectItem::Wildcard && s.from.is_empty() {
2361 sql_bail!("SELECT * with no tables specified is not valid");
2362 }
2363 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2364 }
2365 out
2366 };
2367
2368 let (mut group_scope, select_all_mapping) = {
2372 let ecx = &ExprContext {
2374 qcx,
2375 name: "GROUP BY clause",
2376 scope: &from_scope,
2377 relation_type: &qcx.relation_type(&relation_expr),
2378 allow_aggregates: false,
2379 allow_subqueries: true,
2380 allow_parameters: true,
2381 allow_windows: false,
2382 };
2383 let mut group_key = vec![];
2384 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2385 let mut group_hir_exprs = vec![];
2386 let mut group_scope = Scope::empty();
2387 let mut select_all_mapping = BTreeMap::new();
2388
2389 for group_expr in &s.group_by {
2390 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2391 let new_column = group_key.len();
2392
2393 if let Some(group_expr) = group_expr {
2394 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2398 existing_scope_item.exprs.insert(group_expr.clone());
2399 continue;
2400 }
2401 }
2402
2403 let mut scope_item = if let HirScalarExpr::Column(
2404 ColumnRef {
2405 level: 0,
2406 column: old_column,
2407 },
2408 _name,
2409 ) = &expr
2410 {
2411 select_all_mapping.insert(*old_column, new_column);
2417 let scope_item = ecx.scope.items[*old_column].clone();
2418 scope_item
2419 } else {
2420 ScopeItem::empty()
2421 };
2422
2423 if let Some(group_expr) = group_expr.cloned() {
2424 scope_item.exprs.insert(group_expr);
2425 }
2426
2427 group_key.push(from_scope.len() + group_exprs.len());
2428 group_hir_exprs.push(expr.clone());
2429 group_exprs.insert(expr, scope_item);
2430 }
2431
2432 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2433 for expr in &group_hir_exprs {
2434 if let Some(scope_item) = group_exprs.remove(expr) {
2435 group_scope.items.push(scope_item);
2436 }
2437 }
2438
2439 let ecx = &ExprContext {
2441 qcx,
2442 name: "aggregate function",
2443 scope: &from_scope,
2444 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2445 allow_aggregates: false,
2446 allow_subqueries: true,
2447 allow_parameters: true,
2448 allow_windows: false,
2449 };
2450 let mut agg_exprs = vec![];
2451 for sql_function in aggregates {
2452 if sql_function.over.is_some() {
2453 unreachable!(
2454 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2455 );
2456 }
2457 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2458 group_scope
2459 .items
2460 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2461 }
2462 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2463 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2465 group_key,
2466 agg_exprs,
2467 group_size_hints.aggregate_input_group_size,
2468 );
2469
2470 for i in 0..from_scope.len() {
2476 if !select_all_mapping.contains_key(&i) {
2477 let scope_item = &ecx.scope.items[i];
2478 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2479 table_name: scope_item.table_name.clone(),
2480 column_name: scope_item.column_name.clone(),
2481 allow_unqualified_references: scope_item.allow_unqualified_references,
2482 });
2483 }
2484 }
2485
2486 (group_scope, select_all_mapping)
2487 } else {
2488 (
2490 from_scope.clone(),
2491 (0..from_scope.len()).map(|i| (i, i)).collect(),
2492 )
2493 }
2494 };
2495
2496 if let Some(ref having) = s.having {
2498 let ecx = &ExprContext {
2499 qcx,
2500 name: "HAVING clause",
2501 scope: &group_scope,
2502 relation_type: &qcx.relation_type(&relation_expr),
2503 allow_aggregates: true,
2504 allow_subqueries: true,
2505 allow_parameters: true,
2506 allow_windows: false,
2507 };
2508 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2509 relation_expr = relation_expr.filter(vec![expr]);
2510 }
2511
2512 let window_funcs = {
2525 let mut visitor = WindowFuncCollector::default();
2526 visitor.visit_select(&s);
2530 for o in order_by_exprs.iter() {
2531 visitor.visit_order_by_expr(o);
2532 }
2533 visitor.into_result()
2534 };
2535 for window_func in window_funcs {
2536 let ecx = &ExprContext {
2537 qcx,
2538 name: "window function",
2539 scope: &group_scope,
2540 relation_type: &qcx.relation_type(&relation_expr),
2541 allow_aggregates: true,
2542 allow_subqueries: true,
2543 allow_parameters: true,
2544 allow_windows: true,
2545 };
2546 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2547 group_scope.items.push(ScopeItem::from_expr(window_func));
2548 }
2549 if let Some(ref qualify) = s.qualify {
2556 let ecx = &ExprContext {
2557 qcx,
2558 name: "QUALIFY clause",
2559 scope: &group_scope,
2560 relation_type: &qcx.relation_type(&relation_expr),
2561 allow_aggregates: true,
2562 allow_subqueries: true,
2563 allow_parameters: true,
2564 allow_windows: true,
2565 };
2566 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2567 relation_expr = relation_expr.filter(vec![expr]);
2568 }
2569
2570 let output_columns = {
2572 let mut new_exprs = vec![];
2573 let mut new_type = qcx.relation_type(&relation_expr);
2574 let mut output_columns = vec![];
2575 for (select_item, column_name) in &projection {
2576 let ecx = &ExprContext {
2577 qcx,
2578 name: "SELECT clause",
2579 scope: &group_scope,
2580 relation_type: &new_type,
2581 allow_aggregates: true,
2582 allow_subqueries: true,
2583 allow_parameters: true,
2584 allow_windows: true,
2585 };
2586 let expr = match select_item {
2587 ExpandedSelectItem::InputOrdinal(i) => {
2588 if let Some(column) = select_all_mapping.get(i).copied() {
2589 HirScalarExpr::column(column)
2590 } else {
2591 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2592 }
2593 }
2594 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2595 };
2596 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2597 output_columns.push((column, column_name));
2599 } else {
2600 let typ = ecx.column_type(&expr);
2607 new_type.column_types.push(typ);
2608 new_exprs.push(expr);
2609 output_columns.push((group_scope.len(), column_name));
2610 group_scope
2611 .items
2612 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2613 }
2614 }
2615 relation_expr = relation_expr.map(new_exprs);
2616 output_columns
2617 };
2618 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2619
2620 let order_by = {
2622 let relation_type = qcx.relation_type(&relation_expr);
2623 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2624 &ExprContext {
2625 qcx,
2626 name: "ORDER BY clause",
2627 scope: &group_scope,
2628 relation_type: &relation_type,
2629 allow_aggregates: true,
2630 allow_subqueries: true,
2631 allow_parameters: true,
2632 allow_windows: true,
2633 },
2634 &order_by_exprs,
2635 &output_columns,
2636 )?;
2637
2638 match s.distinct {
2639 None => relation_expr = relation_expr.map(map_exprs),
2640 Some(Distinct::EntireRow) => {
2641 if relation_type.arity() == 0 {
2642 sql_bail!("SELECT DISTINCT must have at least one column");
2643 }
2644 if !try_push_projection_order_by(
2648 &mut relation_expr,
2649 &mut project_key,
2650 &mut order_by,
2651 ) {
2652 sql_bail!(
2653 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2654 );
2655 }
2656 assert!(map_exprs.is_empty());
2657 relation_expr = relation_expr.distinct();
2658 }
2659 Some(Distinct::On(exprs)) => {
2660 let ecx = &ExprContext {
2661 qcx,
2662 name: "DISTINCT ON clause",
2663 scope: &group_scope,
2664 relation_type: &qcx.relation_type(&relation_expr),
2665 allow_aggregates: true,
2666 allow_subqueries: true,
2667 allow_parameters: true,
2668 allow_windows: true,
2669 };
2670
2671 let mut distinct_exprs = vec![];
2672 for expr in &exprs {
2673 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2674 distinct_exprs.push(expr);
2675 }
2676
2677 let mut distinct_key = vec![];
2678
2679 let arity = relation_type.arity();
2689 for ord in order_by.iter().take(distinct_exprs.len()) {
2690 let mut expr = &HirScalarExpr::column(ord.column);
2693 if ord.column >= arity {
2694 expr = &map_exprs[ord.column - arity];
2695 };
2696 match distinct_exprs.iter().position(move |e| e == expr) {
2697 None => sql_bail!(
2698 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2699 ),
2700 Some(pos) => {
2701 distinct_exprs.remove(pos);
2702 }
2703 }
2704 distinct_key.push(ord.column);
2705 }
2706
2707 for expr in distinct_exprs {
2709 let column = match expr {
2712 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2713 _ => {
2714 map_exprs.push(expr);
2715 arity + map_exprs.len() - 1
2716 }
2717 };
2718 distinct_key.push(column);
2719 }
2720
2721 let distinct_len = distinct_key.len();
2726 relation_expr = HirRelationExpr::top_k(
2727 relation_expr.map(map_exprs),
2728 distinct_key,
2729 order_by.iter().skip(distinct_len).cloned().collect(),
2730 Some(HirScalarExpr::literal(
2731 Datum::Int64(1),
2732 SqlScalarType::Int64,
2733 )),
2734 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2735 group_size_hints.distinct_on_input_group_size,
2736 );
2737 }
2738 }
2739
2740 order_by
2741 };
2742
2743 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2748
2749 Ok(SelectPlan {
2750 expr: relation_expr,
2751 scope,
2752 order_by,
2753 project: project_key,
2754 })
2755}
2756
2757fn plan_scalar_table_funcs(
2758 qcx: &QueryContext,
2759 table_funcs: BTreeMap<Function<Aug>, String>,
2760 table_func_names: &mut BTreeMap<String, Ident>,
2761 relation_expr: &HirRelationExpr,
2762 from_scope: &Scope,
2763) -> Result<(HirRelationExpr, Scope), PlanError> {
2764 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2765
2766 for (table_func, id) in table_funcs.iter() {
2767 table_func_names.insert(
2768 id.clone(),
2769 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2771 );
2772 }
2773 if table_funcs.len() == 1 {
2776 let (table_func, id) = table_funcs.iter().next().unwrap();
2777 let (expr, mut scope) =
2778 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2779
2780 let num_cols = scope.len();
2782 for i in 0..scope.len() {
2783 scope.items[i].table_name = Some(PartialItemName {
2784 database: None,
2785 schema: None,
2786 item: id.clone(),
2787 });
2788 scope.items[i].from_single_column_function = num_cols == 1;
2789 scope.items[i].allow_unqualified_references = false;
2790 }
2791 return Ok((expr, scope));
2792 }
2793 let (expr, mut scope, num_cols) =
2795 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797 let mut i = 0;
2799 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800 for _ in 0..num_cols {
2801 scope.items[i].table_name = Some(PartialItemName {
2802 database: None,
2803 schema: None,
2804 item: id.clone(),
2805 });
2806 scope.items[i].from_single_column_function = num_cols == 1;
2807 scope.items[i].allow_unqualified_references = false;
2808 i += 1;
2809 }
2810 scope.items[i].table_name = Some(PartialItemName {
2814 database: None,
2815 schema: None,
2816 item: id.clone(),
2817 });
2818 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819 scope.items[i].allow_unqualified_references = false;
2820 i += 1;
2821 }
2822 scope.items[i].allow_unqualified_references = false;
2824 Ok((expr, scope))
2825}
2826
2827fn plan_group_by_expr<'a>(
2834 ecx: &ExprContext,
2835 group_expr: &'a Expr<Aug>,
2836 projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838 let plan_projection = |column: usize| match &projection[column].0 {
2839 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840 ExpandedSelectItem::Expr(expr) => {
2841 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842 }
2843 };
2844
2845 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848 return plan_projection(column);
2849 }
2850
2851 match group_expr {
2855 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856 Err(PlanError::UnknownColumn {
2857 table: None,
2858 column,
2859 similar,
2860 }) => {
2861 let mut iter = projection.iter().map(|(_expr, name)| name);
2864 if let Some(i) = iter.position(|n| *n == column) {
2865 if iter.any(|n| *n == column) {
2866 Err(PlanError::AmbiguousColumn(column))
2867 } else {
2868 plan_projection(i)
2869 }
2870 } else {
2871 Err(PlanError::UnknownColumn {
2874 table: None,
2875 column,
2876 similar,
2877 })
2878 }
2879 }
2880 res => Ok((Some(group_expr), res?)),
2881 },
2882 _ => Ok((
2883 Some(group_expr),
2884 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885 )),
2886 }
2887}
2888
2889pub(crate) fn plan_order_by_exprs(
2897 ecx: &ExprContext,
2898 order_by_exprs: &[OrderByExpr<Aug>],
2899 output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901 let mut order_by = vec![];
2902 let mut map_exprs = vec![];
2903 for obe in order_by_exprs {
2904 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905 let column = match expr {
2908 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909 _ => {
2910 map_exprs.push(expr);
2911 ecx.relation_type.arity() + map_exprs.len() - 1
2912 }
2913 };
2914 order_by.push(resolve_desc_and_nulls_last(obe, column));
2915 }
2916 Ok((order_by, map_exprs))
2917}
2918
2919fn plan_order_by_or_distinct_expr(
2937 ecx: &ExprContext,
2938 expr: &Expr<Aug>,
2939 output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942 return Ok(HirScalarExpr::column(output_columns[i].0));
2943 }
2944
2945 if let Expr::Identifier(names) = expr {
2946 if let [name] = &names[..] {
2947 let name = normalize::column_name(name.clone());
2948 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949 if let Some((i, _)) = iter.next() {
2950 match iter.next() {
2951 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955 _ => return Ok(HirScalarExpr::column(*i)),
2956 }
2957 }
2958 }
2959 }
2960
2961 plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965 qcx: &QueryContext,
2966 table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969 for join in &table_with_joins.joins {
2970 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971 expr = new_expr;
2972 scope = new_scope;
2973 }
2974 Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978 qcx: &QueryContext,
2979 table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981 match table_factor {
2982 TableFactor::Table { name, alias } => {
2983 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984 let scope = plan_table_alias(scope, alias.as_ref())?;
2985 Ok((expr, scope))
2986 }
2987
2988 TableFactor::Function {
2989 function,
2990 alias,
2991 with_ordinality,
2992 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994 TableFactor::RowsFrom {
2995 functions,
2996 alias,
2997 with_ordinality,
2998 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000 TableFactor::Derived {
3001 lateral,
3002 subquery,
3003 alias,
3004 } => {
3005 let mut qcx = (*qcx).clone();
3006 if !lateral {
3007 for scope in &mut qcx.outer_scopes {
3011 if scope.lateral_barrier {
3012 break;
3013 }
3014 scope.items.clear();
3015 }
3016 }
3017 qcx.outer_scopes[0].lateral_barrier = true;
3018 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019 let scope = plan_table_alias(scope, alias.as_ref())?;
3020 Ok((expr, scope))
3021 }
3022
3023 TableFactor::NestedJoin { join, alias } => {
3024 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025 let scope = plan_table_alias(scope, alias.as_ref())?;
3026 Ok((expr, scope))
3027 }
3028 }
3029}
3030
3031fn plan_rows_from(
3073 qcx: &QueryContext,
3074 functions: &[Function<Aug>],
3075 alias: Option<&TableAlias>,
3076 with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078 if let [function] = functions {
3081 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3082 }
3083
3084 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3088 qcx,
3089 functions,
3090 Some(functions[0].name.full_item_name().clone()),
3091 )?;
3092
3093 let mut columns = Vec::new();
3095 let mut offset = 0;
3096 for (idx, cols) in num_cols.into_iter().enumerate() {
3098 for i in 0..cols {
3099 columns.push(offset + i);
3100 }
3101 offset += cols + 1;
3102
3103 scope.items.remove(offset - idx - 1);
3106 }
3107
3108 if with_ordinality {
3111 columns.push(offset);
3112 } else {
3113 scope.items.pop();
3114 }
3115
3116 let expr = expr.project(columns);
3117
3118 let scope = plan_table_alias(scope, alias)?;
3119 Ok((expr, scope))
3120}
3121
3122fn plan_rows_from_internal<'a>(
3145 qcx: &QueryContext,
3146 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3147 table_name: Option<FullItemName>,
3148) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3149 let mut functions = functions.into_iter();
3150 let mut num_cols = Vec::new();
3151
3152 let (mut left_expr, mut left_scope) =
3156 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3157 num_cols.push(left_scope.len() - 1);
3158 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3160 left_scope
3161 .items
3162 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3163
3164 for function in functions {
3165 let qcx = qcx.empty_derived_context();
3167 let (right_expr, mut right_scope) =
3168 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3169 num_cols.push(right_scope.len() - 1);
3170 let left_col = left_scope.len() - 1;
3171 let right_col = left_scope.len() + right_scope.len() - 1;
3172 let on = HirScalarExpr::call_binary(
3173 HirScalarExpr::column(left_col),
3174 HirScalarExpr::column(right_col),
3175 expr_func::Eq,
3176 );
3177 left_expr = left_expr
3178 .join(right_expr, on, JoinKind::FullOuter)
3179 .map(vec![HirScalarExpr::call_variadic(
3180 VariadicFunc::Coalesce,
3181 vec![
3182 HirScalarExpr::column(left_col),
3183 HirScalarExpr::column(right_col),
3184 ],
3185 )]);
3186
3187 left_expr = left_expr.project(
3190 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3193 );
3194 right_scope.items.push(left_scope.items.pop().unwrap());
3196
3197 left_scope.items.extend(right_scope.items);
3198 }
3199
3200 Ok((left_expr, left_scope, num_cols))
3201}
3202
3203fn plan_solitary_table_function(
3207 qcx: &QueryContext,
3208 function: &Function<Aug>,
3209 alias: Option<&TableAlias>,
3210 with_ordinality: bool,
3211) -> Result<(HirRelationExpr, Scope), PlanError> {
3212 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3213
3214 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3215 if single_column_function {
3216 let item = &mut scope.items[0];
3217
3218 item.from_single_column_function = true;
3221
3222 if let Some(alias) = alias {
3237 if let ScopeItem {
3238 table_name: Some(table_name),
3239 column_name,
3240 ..
3241 } = item
3242 {
3243 if table_name.item.as_str() == column_name.as_str() {
3244 *column_name = normalize::column_name(alias.name.clone());
3245 }
3246 }
3247 }
3248 }
3249
3250 let scope = plan_table_alias(scope, alias)?;
3251 Ok((expr, scope))
3252}
3253
3254fn plan_table_function_internal(
3259 qcx: &QueryContext,
3260 Function {
3261 name,
3262 args,
3263 filter,
3264 over,
3265 distinct,
3266 }: &Function<Aug>,
3267 with_ordinality: bool,
3268 table_name: Option<FullItemName>,
3269) -> Result<(HirRelationExpr, Scope), PlanError> {
3270 assert_none!(filter, "cannot parse table function with FILTER");
3271 assert_none!(over, "cannot parse table function with OVER");
3272 assert!(!*distinct, "cannot parse table function with DISTINCT");
3273
3274 let ecx = &ExprContext {
3275 qcx,
3276 name: "table function arguments",
3277 scope: &Scope::empty(),
3278 relation_type: &SqlRelationType::empty(),
3279 allow_aggregates: false,
3280 allow_subqueries: true,
3281 allow_parameters: true,
3282 allow_windows: false,
3283 };
3284
3285 let scalar_args = match args {
3286 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3287 FunctionArgs::Args { args, order_by } => {
3288 if !order_by.is_empty() {
3289 sql_bail!(
3290 "ORDER BY specified, but {} is not an aggregate function",
3291 name
3292 );
3293 }
3294 plan_exprs(ecx, args)?
3295 }
3296 };
3297
3298 let table_name = match table_name {
3299 Some(table_name) => table_name.item,
3300 None => name.full_item_name().item.clone(),
3301 };
3302
3303 let scope_name = Some(PartialItemName {
3304 database: None,
3305 schema: None,
3306 item: table_name,
3307 });
3308
3309 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3310 Func::Table(impls) => {
3311 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3312 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3313 let expr = match tf.imp {
3314 TableFuncImpl::CallTable { mut func, exprs } => {
3315 if with_ordinality {
3316 func = TableFunc::with_ordinality(func.clone()).ok_or(
3317 PlanError::Unsupported {
3318 feature: format!("WITH ORDINALITY on {}", func),
3319 discussion_no: None,
3320 },
3321 )?;
3322 }
3323 HirRelationExpr::CallTable { func, exprs }
3324 }
3325 TableFuncImpl::Expr(expr) => {
3326 if !with_ordinality {
3327 expr
3328 } else {
3329 if qcx
3333 .scx
3334 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3335 {
3336 tracing::error!(
3340 %name,
3341 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3342 );
3343 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3344 func: WindowExprType::Scalar(ScalarWindowExpr {
3345 func: ScalarWindowFunc::RowNumber,
3346 order_by: vec![],
3347 }),
3348 partition_by: vec![],
3349 order_by: vec![],
3350 })])
3351 } else {
3352 bail_unsupported!(format!(
3353 "WITH ORDINALITY or ROWS FROM with {}",
3354 name
3355 ));
3356 }
3357 }
3358 }
3359 };
3360 (expr, scope)
3361 }
3362 Func::Scalar(impls) => {
3363 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3364 let output = expr.typ(
3365 &qcx.outer_relation_types,
3366 &SqlRelationType::new(vec![]),
3367 &qcx.scx.param_types.borrow(),
3368 );
3369
3370 let relation = SqlRelationType::new(vec![output]);
3371
3372 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3373 let column_name = normalize::column_name(function_ident);
3374 let name = column_name.to_string();
3375
3376 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3377
3378 let mut func = TableFunc::TabletizedScalar { relation, name };
3379 if with_ordinality {
3380 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3381 feature: format!("WITH ORDINALITY on {}", func),
3382 discussion_no: None,
3383 })?;
3384 }
3385 (
3386 HirRelationExpr::CallTable {
3387 func,
3388 exprs: vec![expr],
3389 },
3390 scope,
3391 )
3392 }
3393 o => sql_bail!(
3394 "{} functions are not supported in functions in FROM",
3395 o.class()
3396 ),
3397 };
3398
3399 if with_ordinality {
3400 scope
3401 .items
3402 .push(ScopeItem::from_name(scope_name, "ordinality"));
3403 }
3404
3405 Ok((expr, scope))
3406}
3407
3408fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3409 if let Some(TableAlias {
3410 name,
3411 columns,
3412 strict,
3413 }) = alias
3414 {
3415 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3416 sql_bail!(
3417 "{} has {} columns available but {} columns specified",
3418 name,
3419 scope.items.len(),
3420 columns.len()
3421 );
3422 }
3423
3424 let table_name = normalize::ident(name.to_owned());
3425 for (i, item) in scope.items.iter_mut().enumerate() {
3426 item.table_name = if item.allow_unqualified_references {
3427 Some(PartialItemName {
3428 database: None,
3429 schema: None,
3430 item: table_name.clone(),
3431 })
3432 } else {
3433 None
3467 };
3468 item.column_name = columns
3469 .get(i)
3470 .map(|a| normalize::column_name(a.clone()))
3471 .unwrap_or_else(|| item.column_name.clone());
3472 }
3473 }
3474 Ok(scope)
3475}
3476
3477fn invent_column_name(
3481 ecx: &ExprContext,
3482 expr: &Expr<Aug>,
3483 table_func_names: &BTreeMap<String, Ident>,
3484) -> Option<ColumnName> {
3485 #[derive(Debug)]
3492 enum NameQuality {
3493 Low,
3494 High,
3495 }
3496
3497 fn invent(
3498 ecx: &ExprContext,
3499 expr: &Expr<Aug>,
3500 table_func_names: &BTreeMap<String, Ident>,
3501 ) -> Option<(ColumnName, NameQuality)> {
3502 match expr {
3503 Expr::Identifier(names) => {
3504 if let [name] = names.as_slice() {
3505 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3506 return Some((
3507 normalize::column_name(table_func_name.clone()),
3508 NameQuality::High,
3509 ));
3510 }
3511 }
3512 names
3513 .last()
3514 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3515 }
3516 Expr::Value(v) => match v {
3517 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3520 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3521 _ => None,
3522 },
3523 Expr::Function(func) => {
3524 let (schema, item) = match &func.name {
3525 ResolvedItemName::Item {
3526 qualifiers,
3527 full_name,
3528 ..
3529 } => (&qualifiers.schema_spec, full_name.item.clone()),
3530 _ => unreachable!(),
3531 };
3532
3533 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3534 || schema
3535 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3536 {
3537 None
3538 } else {
3539 Some((item.into(), NameQuality::High))
3540 }
3541 }
3542 Expr::HomogenizingFunction { function, .. } => Some((
3543 function.to_string().to_lowercase().into(),
3544 NameQuality::High,
3545 )),
3546 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3547 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3548 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3549 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3550 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3551 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3552 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3553 },
3554 Expr::Case { else_result, .. } => {
3555 match else_result
3556 .as_ref()
3557 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3558 {
3559 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560 _ => Some(("case".into(), NameQuality::Low)),
3561 }
3562 }
3563 Expr::FieldAccess { field, .. } => {
3564 Some((normalize::column_name(field.clone()), NameQuality::High))
3565 }
3566 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3567 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3568 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3569 let (_expr, scope) =
3573 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3574 scope
3575 .items
3576 .first()
3577 .map(|name| (name.column_name.clone(), NameQuality::High))
3578 }
3579 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3580 _ => None,
3581 }
3582 }
3583
3584 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3585}
3586
3587#[derive(Debug)]
3588enum ExpandedSelectItem<'a> {
3589 InputOrdinal(usize),
3590 Expr(Cow<'a, Expr<Aug>>),
3591}
3592
3593impl ExpandedSelectItem<'_> {
3594 fn as_expr(&self) -> Option<&Expr<Aug>> {
3595 match self {
3596 ExpandedSelectItem::InputOrdinal(_) => None,
3597 ExpandedSelectItem::Expr(expr) => Some(expr),
3598 }
3599 }
3600}
3601
3602fn expand_select_item<'a>(
3603 ecx: &ExprContext,
3604 s: &'a SelectItem<Aug>,
3605 table_func_names: &BTreeMap<String, Ident>,
3606) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3607 match s {
3608 SelectItem::Expr {
3609 expr: Expr::QualifiedWildcard(table_name),
3610 alias: _,
3611 } => {
3612 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3613 let table_name =
3614 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3615 let out: Vec<_> = ecx
3616 .scope
3617 .items
3618 .iter()
3619 .enumerate()
3620 .filter(|(_i, item)| item.is_from_table(&table_name))
3621 .map(|(i, item)| {
3622 let name = item.column_name.clone();
3623 (ExpandedSelectItem::InputOrdinal(i), name)
3624 })
3625 .collect();
3626 if out.is_empty() {
3627 sql_bail!("no table named '{}' in scope", table_name);
3628 }
3629 Ok(out)
3630 }
3631 SelectItem::Expr {
3632 expr: Expr::WildcardAccess(sql_expr),
3633 alias: _,
3634 } => {
3635 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3642 let fields = match ecx.scalar_type(&expr) {
3643 SqlScalarType::Record { fields, .. } => fields,
3644 ty => sql_bail!(
3645 "type {} is not composite",
3646 ecx.humanize_scalar_type(&ty, false)
3647 ),
3648 };
3649 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3650 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3651 if let [name] = ident.as_slice() {
3652 if let Ok(items) = ecx.scope.items_from_table(
3653 &[],
3654 &PartialItemName {
3655 database: None,
3656 schema: None,
3657 item: name.as_str().to_string(),
3658 },
3659 ) {
3660 for (_, item) in items {
3661 if item
3662 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3663 {
3664 skip_cols.insert(item.column_name.clone());
3665 }
3666 }
3667 }
3668 }
3669 }
3670 let items = fields
3671 .iter()
3672 .filter_map(|(name, _ty)| {
3673 if skip_cols.contains(name) {
3674 None
3675 } else {
3676 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3677 expr: sql_expr.clone(),
3678 field: name.clone().into(),
3679 }));
3680 Some((item, name.clone()))
3681 }
3682 })
3683 .collect();
3684 Ok(items)
3685 }
3686 SelectItem::Wildcard => {
3687 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3688 let items: Vec<_> = ecx
3689 .scope
3690 .items
3691 .iter()
3692 .enumerate()
3693 .filter(|(_i, item)| item.allow_unqualified_references)
3694 .map(|(i, item)| {
3695 let name = item.column_name.clone();
3696 (ExpandedSelectItem::InputOrdinal(i), name)
3697 })
3698 .collect();
3699
3700 Ok(items)
3701 }
3702 SelectItem::Expr { expr, alias } => {
3703 let name = alias
3704 .clone()
3705 .map(normalize::column_name)
3706 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3707 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3708 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3709 }
3710 }
3711}
3712
3713fn plan_join(
3714 left_qcx: &QueryContext,
3715 left: HirRelationExpr,
3716 left_scope: Scope,
3717 join: &Join<Aug>,
3718) -> Result<(HirRelationExpr, Scope), PlanError> {
3719 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3720 let (kind, constraint) = match &join.join_operator {
3721 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3722 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3723 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3724 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3725 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3726 };
3727
3728 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3729 if !kind.can_be_correlated() {
3730 for item in &mut right_qcx.outer_scopes[0].items {
3731 item.error_if_referenced =
3736 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3737 table: table.cloned(),
3738 column: column.clone(),
3739 });
3740 }
3741 }
3742 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3743
3744 let (expr, scope) = match constraint {
3745 JoinConstraint::On(expr) => {
3746 let product_scope = left_scope.product(right_scope)?;
3747 let ecx = &ExprContext {
3748 qcx: left_qcx,
3749 name: "ON clause",
3750 scope: &product_scope,
3751 relation_type: &SqlRelationType::new(
3752 left_qcx
3753 .relation_type(&left)
3754 .column_types
3755 .into_iter()
3756 .chain(right_qcx.relation_type(&right).column_types)
3757 .collect(),
3758 ),
3759 allow_aggregates: false,
3760 allow_subqueries: true,
3761 allow_parameters: true,
3762 allow_windows: false,
3763 };
3764 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3765 let joined = left.join(right, on, kind);
3766 (joined, product_scope)
3767 }
3768 JoinConstraint::Using { columns, alias } => {
3769 let column_names = columns
3770 .iter()
3771 .map(|ident| normalize::column_name(ident.clone()))
3772 .collect::<Vec<_>>();
3773
3774 plan_using_constraint(
3775 &column_names,
3776 left_qcx,
3777 left,
3778 left_scope,
3779 &right_qcx,
3780 right,
3781 right_scope,
3782 kind,
3783 alias.as_ref(),
3784 )?
3785 }
3786 JoinConstraint::Natural => {
3787 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3790 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3791 let left_column_names = left_scope.column_names();
3792 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3793 let column_names: Vec<_> = left_column_names
3794 .filter(|col| right_column_names.contains(col))
3795 .cloned()
3796 .collect();
3797 plan_using_constraint(
3798 &column_names,
3799 left_qcx,
3800 left,
3801 left_scope,
3802 &right_qcx,
3803 right,
3804 right_scope,
3805 kind,
3806 None,
3807 )?
3808 }
3809 };
3810 Ok((expr, scope))
3811}
3812
3813#[allow(clippy::too_many_arguments)]
3815fn plan_using_constraint(
3816 column_names: &[ColumnName],
3817 left_qcx: &QueryContext,
3818 left: HirRelationExpr,
3819 left_scope: Scope,
3820 right_qcx: &QueryContext,
3821 right: HirRelationExpr,
3822 right_scope: Scope,
3823 kind: JoinKind,
3824 alias: Option<&Ident>,
3825) -> Result<(HirRelationExpr, Scope), PlanError> {
3826 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3827
3828 let mut unique_column_names = BTreeSet::new();
3831 for c in column_names {
3832 if !unique_column_names.insert(c) {
3833 return Err(PlanError::Unsupported {
3834 feature: format!(
3835 "column name {} appears more than once in USING clause",
3836 c.quoted()
3837 ),
3838 discussion_no: None,
3839 });
3840 }
3841 }
3842
3843 let alias_item_name = alias.map(|alias| PartialItemName {
3844 database: None,
3845 schema: None,
3846 item: alias.clone().to_string(),
3847 });
3848
3849 if let Some(alias_item_name) = &alias_item_name {
3850 for partial_item_name in both_scope.table_names() {
3851 if partial_item_name.matches(alias_item_name) {
3852 sql_bail!(
3853 "table name \"{}\" specified more than once",
3854 alias_item_name
3855 )
3856 }
3857 }
3858 }
3859
3860 let ecx = &ExprContext {
3861 qcx: right_qcx,
3862 name: "USING clause",
3863 scope: &both_scope,
3864 relation_type: &SqlRelationType::new(
3865 left_qcx
3866 .relation_type(&left)
3867 .column_types
3868 .into_iter()
3869 .chain(right_qcx.relation_type(&right).column_types)
3870 .collect(),
3871 ),
3872 allow_aggregates: false,
3873 allow_subqueries: false,
3874 allow_parameters: false,
3875 allow_windows: false,
3876 };
3877
3878 let mut join_exprs = vec![];
3879 let mut map_exprs = vec![];
3880 let mut new_items = vec![];
3881 let mut join_cols = vec![];
3882 let mut hidden_cols = vec![];
3883
3884 for column_name in column_names {
3885 let (lhs, lhs_name) = left_scope.resolve_using_column(
3887 column_name,
3888 JoinSide::Left,
3889 &mut left_qcx.name_manager.borrow_mut(),
3890 )?;
3891 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3892 column_name,
3893 JoinSide::Right,
3894 &mut right_qcx.name_manager.borrow_mut(),
3895 )?;
3896
3897 rhs.column += left_scope.len();
3899
3900 let mut exprs = coerce_homogeneous_exprs(
3902 &ecx.with_name(&format!(
3903 "NATURAL/USING join column {}",
3904 column_name.quoted()
3905 )),
3906 vec![
3907 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908 lhs,
3909 Arc::clone(&lhs_name),
3910 )),
3911 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3912 rhs,
3913 Arc::clone(&rhs_name),
3914 )),
3915 ],
3916 None,
3917 )?;
3918 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3919
3920 match kind {
3921 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3922 join_cols.push(lhs.column);
3923 hidden_cols.push(rhs.column);
3924 }
3925 JoinKind::RightOuter => {
3926 join_cols.push(rhs.column);
3927 hidden_cols.push(lhs.column);
3928 }
3929 JoinKind::FullOuter => {
3930 join_cols.push(both_scope.items.len() + map_exprs.len());
3933 hidden_cols.push(lhs.column);
3934 hidden_cols.push(rhs.column);
3935 map_exprs.push(HirScalarExpr::call_variadic(
3936 VariadicFunc::Coalesce,
3937 vec![expr1.clone(), expr2.clone()],
3938 ));
3939 new_items.push(ScopeItem::from_column_name(column_name));
3940 }
3941 }
3942
3943 if alias_item_name.is_some() {
3948 let new_item_col = both_scope.items.len() + new_items.len();
3949 join_cols.push(new_item_col);
3950 hidden_cols.push(new_item_col);
3951
3952 new_items.push(ScopeItem::from_name(
3953 alias_item_name.clone(),
3954 column_name.clone().to_string(),
3955 ));
3956
3957 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3961 }
3962
3963 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3964 }
3965 both_scope.items.extend(new_items);
3966
3967 for c in hidden_cols {
3971 both_scope.items[c].allow_unqualified_references = false;
3972 }
3973
3974 let project_key = join_cols
3976 .into_iter()
3977 .chain(0..both_scope.items.len())
3978 .unique()
3979 .collect::<Vec<_>>();
3980
3981 both_scope = both_scope.project(&project_key);
3982
3983 let on = HirScalarExpr::variadic_and(join_exprs);
3984
3985 let both = left
3986 .join(right, on, kind)
3987 .map(map_exprs)
3988 .project(project_key);
3989 Ok((both, both_scope))
3990}
3991
3992pub fn plan_expr<'a>(
3993 ecx: &'a ExprContext,
3994 e: &Expr<Aug>,
3995) -> Result<CoercibleScalarExpr, PlanError> {
3996 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3997}
3998
3999fn plan_expr_inner<'a>(
4000 ecx: &'a ExprContext,
4001 e: &Expr<Aug>,
4002) -> Result<CoercibleScalarExpr, PlanError> {
4003 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4004 return Ok(HirScalarExpr::named_column(
4006 i,
4007 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4008 )
4009 .into());
4010 }
4011
4012 match e {
4013 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4015 Ok(plan_identifier(ecx, names)?.into())
4016 }
4017
4018 Expr::Value(val) => plan_literal(val),
4020 Expr::Parameter(n) => plan_parameter(ecx, *n),
4021 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4022 Expr::List(exprs) => plan_list(ecx, exprs, None),
4023 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4024 Expr::Row { exprs } => plan_row(ecx, exprs),
4025
4026 Expr::Op { op, expr1, expr2 } => {
4028 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4029 }
4030 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4031 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4032
4033 Expr::Not { expr } => plan_not(ecx, expr),
4035 Expr::And { left, right } => plan_and(ecx, left, right),
4036 Expr::Or { left, right } => plan_or(ecx, left, right),
4037 Expr::IsExpr {
4038 expr,
4039 construct,
4040 negated,
4041 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4042 Expr::Case {
4043 operand,
4044 conditions,
4045 results,
4046 else_result,
4047 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4048 Expr::HomogenizingFunction { function, exprs } => {
4049 plan_homogenizing_function(ecx, function, exprs)
4050 }
4051 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4052 ecx,
4053 &None,
4054 &[l_expr.clone().equals(*r_expr.clone())],
4055 &[Expr::null()],
4056 &Some(Box::new(*l_expr.clone())),
4057 )?
4058 .into()),
4059 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4060 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4061 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4062 Expr::Like {
4063 expr,
4064 pattern,
4065 escape,
4066 case_insensitive,
4067 negated,
4068 } => Ok(plan_like(
4069 ecx,
4070 expr,
4071 pattern,
4072 escape.as_deref(),
4073 *case_insensitive,
4074 *negated,
4075 )?
4076 .into()),
4077
4078 Expr::InList {
4079 expr,
4080 list,
4081 negated,
4082 } => plan_in_list(ecx, expr, list, negated),
4083
4084 Expr::Exists(query) => plan_exists(ecx, query),
4086 Expr::Subquery(query) => plan_subquery(ecx, query),
4087 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4088 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4089 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4090 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4091 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4092 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4093 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4094 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4095 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4096 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4097 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4098 }
4099}
4100
4101fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4102 if !ecx.allow_parameters {
4103 return Err(PlanError::UnknownParameter(n));
4107 }
4108 if n == 0 || n > 65536 {
4109 return Err(PlanError::UnknownParameter(n));
4110 }
4111 if ecx.param_types().borrow().contains_key(&n) {
4112 Ok(HirScalarExpr::parameter(n).into())
4113 } else {
4114 Ok(CoercibleScalarExpr::Parameter(n))
4115 }
4116}
4117
4118fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4119 let mut out = vec![];
4120 for e in exprs {
4121 out.push(plan_expr(ecx, e)?);
4122 }
4123 Ok(CoercibleScalarExpr::LiteralRecord(out))
4124}
4125
4126fn plan_cast(
4127 ecx: &ExprContext,
4128 expr: &Expr<Aug>,
4129 data_type: &ResolvedDataType,
4130) -> Result<CoercibleScalarExpr, PlanError> {
4131 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4132 let expr = match expr {
4133 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4142 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4143 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4144 _ => plan_expr(ecx, expr)?,
4145 };
4146 let ecx = &ecx.with_name("CAST");
4147 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4148 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4149 Ok(expr.into())
4150}
4151
4152fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4153 let ecx = ecx.with_name("NOT argument");
4154 Ok(plan_expr(&ecx, expr)?
4155 .type_as(&ecx, &SqlScalarType::Bool)?
4156 .call_unary(UnaryFunc::Not(expr_func::Not))
4157 .into())
4158}
4159
4160fn plan_and(
4161 ecx: &ExprContext,
4162 left: &Expr<Aug>,
4163 right: &Expr<Aug>,
4164) -> Result<CoercibleScalarExpr, PlanError> {
4165 let ecx = ecx.with_name("AND argument");
4166 Ok(HirScalarExpr::variadic_and(vec![
4167 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4168 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4169 ])
4170 .into())
4171}
4172
4173fn plan_or(
4174 ecx: &ExprContext,
4175 left: &Expr<Aug>,
4176 right: &Expr<Aug>,
4177) -> Result<CoercibleScalarExpr, PlanError> {
4178 let ecx = ecx.with_name("OR argument");
4179 Ok(HirScalarExpr::variadic_or(vec![
4180 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4181 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4182 ])
4183 .into())
4184}
4185
4186fn plan_in_list(
4187 ecx: &ExprContext,
4188 lhs: &Expr<Aug>,
4189 list: &Vec<Expr<Aug>>,
4190 negated: &bool,
4191) -> Result<CoercibleScalarExpr, PlanError> {
4192 let ecx = ecx.with_name("IN list");
4193 let or = HirScalarExpr::variadic_or(
4194 list.into_iter()
4195 .map(|e| {
4196 let eq = lhs.clone().equals(e.clone());
4197 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4198 })
4199 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4200 );
4201 Ok(if *negated {
4202 or.call_unary(UnaryFunc::Not(expr_func::Not))
4203 } else {
4204 or
4205 }
4206 .into())
4207}
4208
4209fn plan_homogenizing_function(
4210 ecx: &ExprContext,
4211 function: &HomogenizingFunction,
4212 exprs: &[Expr<Aug>],
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4216 match function {
4217 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4218 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4219 HomogenizingFunction::Least => VariadicFunc::Least,
4220 },
4221 coerce_homogeneous_exprs(
4222 &ecx.with_name(&function.to_string().to_lowercase()),
4223 plan_exprs(ecx, exprs)?,
4224 None,
4225 )?,
4226 );
4227 Ok(expr.into())
4228}
4229
4230fn plan_field_access(
4231 ecx: &ExprContext,
4232 expr: &Expr<Aug>,
4233 field: &Ident,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235 let field = normalize::column_name(field.clone());
4236 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4237 let ty = ecx.scalar_type(&expr);
4238 let i = match &ty {
4239 SqlScalarType::Record { fields, .. } => {
4240 fields.iter().position(|(name, _ty)| *name == field)
4241 }
4242 ty => sql_bail!(
4243 "column notation applied to type {}, which is not a composite type",
4244 ecx.humanize_scalar_type(ty, false)
4245 ),
4246 };
4247 match i {
4248 None => sql_bail!(
4249 "field {} not found in data type {}",
4250 field,
4251 ecx.humanize_scalar_type(&ty, false)
4252 ),
4253 Some(i) => Ok(expr
4254 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4255 .into()),
4256 }
4257}
4258
4259fn plan_subscript(
4260 ecx: &ExprContext,
4261 expr: &Expr<Aug>,
4262 positions: &[SubscriptPosition<Aug>],
4263) -> Result<CoercibleScalarExpr, PlanError> {
4264 assert!(
4265 !positions.is_empty(),
4266 "subscript expression must contain at least one position"
4267 );
4268
4269 let ecx = &ecx.with_name("subscripting");
4270 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4271 let ty = ecx.scalar_type(&expr);
4272 match &ty {
4273 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4274 ecx,
4275 expr,
4276 positions,
4277 if ty == SqlScalarType::Int2Vector {
4281 1
4282 } else {
4283 0
4284 },
4285 ),
4286 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4287 SqlScalarType::List { element_type, .. } => {
4288 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4290 let n_layers = ty.unwrap_list_n_layers();
4291 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4292 }
4293 ty => sql_bail!(
4294 "cannot subscript type {}",
4295 ecx.humanize_scalar_type(ty, false)
4296 ),
4297 }
4298}
4299
4300fn extract_scalar_subscript_from_positions<'a>(
4304 positions: &'a [SubscriptPosition<Aug>],
4305 expr_type_name: &str,
4306) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4307 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4308 for p in positions {
4309 if p.explicit_slice {
4310 sql_bail!("{} subscript does not support slices", expr_type_name);
4311 }
4312 assert!(
4313 p.end.is_none(),
4314 "index-appearing subscripts cannot have end value"
4315 );
4316 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4317 }
4318 Ok(scalar_subscripts)
4319}
4320
4321fn plan_subscript_array(
4322 ecx: &ExprContext,
4323 expr: HirScalarExpr,
4324 positions: &[SubscriptPosition<Aug>],
4325 offset: i64,
4326) -> Result<CoercibleScalarExpr, PlanError> {
4327 let mut exprs = Vec::with_capacity(positions.len() + 1);
4328 exprs.push(expr);
4329
4330 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4333
4334 for i in indexes {
4335 exprs.push(plan_expr(ecx, i)?.cast_to(
4336 ecx,
4337 CastContext::Explicit,
4338 &SqlScalarType::Int64,
4339 )?);
4340 }
4341
4342 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4343}
4344
4345fn plan_subscript_list(
4346 ecx: &ExprContext,
4347 mut expr: HirScalarExpr,
4348 positions: &[SubscriptPosition<Aug>],
4349 mut remaining_layers: usize,
4350 elem_type_name: &str,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352 let mut i = 0;
4353
4354 while i < positions.len() {
4355 let j = positions[i..]
4357 .iter()
4358 .position(|p| p.explicit_slice)
4359 .unwrap_or(positions.len() - i);
4360 if j != 0 {
4361 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4362 let (n, e) = plan_index_list(
4363 ecx,
4364 expr,
4365 indexes.as_slice(),
4366 remaining_layers,
4367 elem_type_name,
4368 )?;
4369 remaining_layers = n;
4370 expr = e;
4371 i += j;
4372 }
4373
4374 let j = positions[i..]
4376 .iter()
4377 .position(|p| !p.explicit_slice)
4378 .unwrap_or(positions.len() - i);
4379 if j != 0 {
4380 expr = plan_slice_list(
4381 ecx,
4382 expr,
4383 &positions[i..i + j],
4384 remaining_layers,
4385 elem_type_name,
4386 )?;
4387 i += j;
4388 }
4389 }
4390
4391 Ok(expr.into())
4392}
4393
4394fn plan_index_list(
4395 ecx: &ExprContext,
4396 expr: HirScalarExpr,
4397 indexes: &[&Expr<Aug>],
4398 n_layers: usize,
4399 elem_type_name: &str,
4400) -> Result<(usize, HirScalarExpr), PlanError> {
4401 let depth = indexes.len();
4402
4403 if depth > n_layers {
4404 if n_layers == 0 {
4405 sql_bail!("cannot subscript type {}", elem_type_name)
4406 } else {
4407 sql_bail!(
4408 "cannot index into {} layers; list only has {} layer{}",
4409 depth,
4410 n_layers,
4411 if n_layers == 1 { "" } else { "s" }
4412 )
4413 }
4414 }
4415
4416 let mut exprs = Vec::with_capacity(depth + 1);
4417 exprs.push(expr);
4418
4419 for i in indexes {
4420 exprs.push(plan_expr(ecx, i)?.cast_to(
4421 ecx,
4422 CastContext::Explicit,
4423 &SqlScalarType::Int64,
4424 )?);
4425 }
4426
4427 Ok((
4428 n_layers - depth,
4429 HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4430 ))
4431}
4432
4433fn plan_slice_list(
4434 ecx: &ExprContext,
4435 expr: HirScalarExpr,
4436 slices: &[SubscriptPosition<Aug>],
4437 n_layers: usize,
4438 elem_type_name: &str,
4439) -> Result<HirScalarExpr, PlanError> {
4440 if n_layers == 0 {
4441 sql_bail!("cannot subscript type {}", elem_type_name)
4442 }
4443
4444 let mut exprs = Vec::with_capacity(slices.len() + 1);
4446 exprs.push(expr);
4447 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4449 Ok(match position {
4450 Some(p) => {
4451 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4452 }
4453 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4454 })
4455 };
4456 for p in slices {
4457 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4458 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4459 exprs.push(start);
4460 exprs.push(end);
4461 }
4462
4463 Ok(HirScalarExpr::call_variadic(
4464 VariadicFunc::ListSliceLinear,
4465 exprs,
4466 ))
4467}
4468
4469fn plan_like(
4470 ecx: &ExprContext,
4471 expr: &Expr<Aug>,
4472 pattern: &Expr<Aug>,
4473 escape: Option<&Expr<Aug>>,
4474 case_insensitive: bool,
4475 not: bool,
4476) -> Result<HirScalarExpr, PlanError> {
4477 use CastContext::Implicit;
4478 let ecx = ecx.with_name("LIKE argument");
4479 let expr = plan_expr(&ecx, expr)?;
4480 let haystack = match ecx.scalar_type(&expr) {
4481 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4482 .type_as(&ecx, ty)?
4483 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4484 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4485 };
4486 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4487 if let Some(escape) = escape {
4488 pattern = pattern.call_binary(
4489 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4490 expr_func::LikeEscape,
4491 );
4492 }
4493 let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4494 if not {
4495 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4496 } else {
4497 Ok(like)
4498 }
4499}
4500
4501fn plan_subscript_jsonb(
4502 ecx: &ExprContext,
4503 expr: HirScalarExpr,
4504 positions: &[SubscriptPosition<Aug>],
4505) -> Result<CoercibleScalarExpr, PlanError> {
4506 use CastContext::Implicit;
4507 use SqlScalarType::{Int64, String};
4508
4509 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4512
4513 let mut exprs = Vec::with_capacity(subscripts.len());
4514 for s in subscripts {
4515 let subscript = plan_expr(ecx, s)?;
4516 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4517 subscript
4518 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4519 typeconv::to_string(ecx, subscript)
4523 } else {
4524 sql_bail!("jsonb subscript type must be coercible to integer or text");
4525 };
4526 exprs.push(subscript);
4527 }
4528
4529 let expr = expr.call_binary(
4532 HirScalarExpr::call_variadic(
4533 VariadicFunc::ArrayCreate {
4534 elem_type: SqlScalarType::String,
4535 },
4536 exprs,
4537 ),
4538 BinaryFunc::JsonbGetPath,
4539 );
4540 Ok(expr.into())
4541}
4542
4543fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4544 if !ecx.allow_subqueries {
4545 sql_bail!("{} does not allow subqueries", ecx.name)
4546 }
4547 let mut qcx = ecx.derived_query_context();
4548 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4549 Ok(expr.exists().into())
4550}
4551
4552fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4553 if !ecx.allow_subqueries {
4554 sql_bail!("{} does not allow subqueries", ecx.name)
4555 }
4556 let mut qcx = ecx.derived_query_context();
4557 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4558 let column_types = qcx.relation_type(&expr).column_types;
4559 if column_types.len() != 1 {
4560 sql_bail!(
4561 "Expected subselect to return 1 column, got {} columns",
4562 column_types.len()
4563 );
4564 }
4565 Ok(expr.select().into())
4566}
4567
4568fn plan_list_subquery(
4569 ecx: &ExprContext,
4570 query: &Query<Aug>,
4571) -> Result<CoercibleScalarExpr, PlanError> {
4572 plan_vector_like_subquery(
4573 ecx,
4574 query,
4575 |_| false,
4576 |elem_type| VariadicFunc::ListCreate { elem_type },
4577 |order_by| AggregateFunc::ListConcat { order_by },
4578 expr_func::ListListConcat.into(),
4579 |elem_type| {
4580 HirScalarExpr::literal(
4581 Datum::empty_list(),
4582 SqlScalarType::List {
4583 element_type: Box::new(elem_type),
4584 custom_id: None,
4585 },
4586 )
4587 },
4588 "list",
4589 )
4590}
4591
4592fn plan_array_subquery(
4593 ecx: &ExprContext,
4594 query: &Query<Aug>,
4595) -> Result<CoercibleScalarExpr, PlanError> {
4596 plan_vector_like_subquery(
4597 ecx,
4598 query,
4599 |elem_type| {
4600 matches!(
4601 elem_type,
4602 SqlScalarType::Char { .. }
4603 | SqlScalarType::Array { .. }
4604 | SqlScalarType::List { .. }
4605 | SqlScalarType::Map { .. }
4606 )
4607 },
4608 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4609 |order_by| AggregateFunc::ArrayConcat { order_by },
4610 expr_func::ArrayArrayConcat.into(),
4611 |elem_type| {
4612 HirScalarExpr::literal(
4613 Datum::empty_array(),
4614 SqlScalarType::Array(Box::new(elem_type)),
4615 )
4616 },
4617 "[]",
4618 )
4619}
4620
4621fn plan_vector_like_subquery<F1, F2, F3, F4>(
4623 ecx: &ExprContext,
4624 query: &Query<Aug>,
4625 is_unsupported_type: F1,
4626 vector_create: F2,
4627 aggregate_concat: F3,
4628 binary_concat: BinaryFunc,
4629 empty_literal: F4,
4630 vector_type_string: &str,
4631) -> Result<CoercibleScalarExpr, PlanError>
4632where
4633 F1: Fn(&SqlScalarType) -> bool,
4634 F2: Fn(SqlScalarType) -> VariadicFunc,
4635 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4636 F4: Fn(SqlScalarType) -> HirScalarExpr,
4637{
4638 if !ecx.allow_subqueries {
4639 sql_bail!("{} does not allow subqueries", ecx.name)
4640 }
4641
4642 let mut qcx = ecx.derived_query_context();
4643 let mut planned_query = plan_query(&mut qcx, query)?;
4644 if planned_query.limit.is_some()
4645 || !planned_query
4646 .offset
4647 .clone()
4648 .try_into_literal_int64()
4649 .is_ok_and(|offset| offset == 0)
4650 {
4651 planned_query.expr = HirRelationExpr::top_k(
4652 planned_query.expr,
4653 vec![],
4654 planned_query.order_by.clone(),
4655 planned_query.limit,
4656 planned_query.offset,
4657 planned_query.group_size_hints.limit_input_group_size,
4658 );
4659 }
4660
4661 if planned_query.project.len() != 1 {
4662 sql_bail!(
4663 "Expected subselect to return 1 column, got {} columns",
4664 planned_query.project.len()
4665 );
4666 }
4667
4668 let project_column = *planned_query.project.get(0).unwrap();
4669 let elem_type = qcx
4670 .relation_type(&planned_query.expr)
4671 .column_types
4672 .get(project_column)
4673 .cloned()
4674 .unwrap()
4675 .scalar_type();
4676
4677 if is_unsupported_type(&elem_type) {
4678 bail_unsupported!(format!(
4679 "cannot build array from subquery because return type {}{}",
4680 ecx.humanize_scalar_type(&elem_type, false),
4681 vector_type_string
4682 ));
4683 }
4684
4685 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4688 vector_create(elem_type.clone()),
4689 vec![HirScalarExpr::column(project_column)],
4690 ))
4691 .chain(
4692 planned_query
4693 .order_by
4694 .iter()
4695 .map(|co| HirScalarExpr::column(co.column)),
4696 )
4697 .collect();
4698
4699 let aggregation_projection = vec![0];
4703 let aggregation_order_by = planned_query
4704 .order_by
4705 .into_iter()
4706 .enumerate()
4707 .map(|(i, order)| ColumnOrder { column: i, ..order })
4708 .collect();
4709
4710 let reduced_expr = planned_query
4711 .expr
4712 .reduce(
4713 vec![],
4714 vec![AggregateExpr {
4715 func: aggregate_concat(aggregation_order_by),
4716 expr: Box::new(HirScalarExpr::call_variadic(
4717 VariadicFunc::RecordCreate {
4718 field_names: iter::repeat(ColumnName::from(""))
4719 .take(aggregation_exprs.len())
4720 .collect(),
4721 },
4722 aggregation_exprs,
4723 )),
4724 distinct: false,
4725 }],
4726 None,
4727 )
4728 .project(aggregation_projection);
4729
4730 Ok(reduced_expr
4732 .select()
4733 .call_binary(empty_literal(elem_type), binary_concat)
4734 .into())
4735}
4736
4737fn plan_map_subquery(
4738 ecx: &ExprContext,
4739 query: &Query<Aug>,
4740) -> Result<CoercibleScalarExpr, PlanError> {
4741 if !ecx.allow_subqueries {
4742 sql_bail!("{} does not allow subqueries", ecx.name)
4743 }
4744
4745 let mut qcx = ecx.derived_query_context();
4746 let mut query = plan_query(&mut qcx, query)?;
4747 if query.limit.is_some()
4748 || !query
4749 .offset
4750 .clone()
4751 .try_into_literal_int64()
4752 .is_ok_and(|offset| offset == 0)
4753 {
4754 query.expr = HirRelationExpr::top_k(
4755 query.expr,
4756 vec![],
4757 query.order_by.clone(),
4758 query.limit,
4759 query.offset,
4760 query.group_size_hints.limit_input_group_size,
4761 );
4762 }
4763 if query.project.len() != 2 {
4764 sql_bail!(
4765 "expected map subquery to return 2 columns, got {} columns",
4766 query.project.len()
4767 );
4768 }
4769
4770 let query_types = qcx.relation_type(&query.expr).column_types;
4771 let key_column = query.project[0];
4772 let key_type = query_types[key_column].clone().scalar_type();
4773 let value_column = query.project[1];
4774 let value_type = query_types[value_column].clone().scalar_type();
4775
4776 if key_type != SqlScalarType::String {
4777 sql_bail!("cannot build map from subquery because first column is not of type text");
4778 }
4779
4780 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4781 VariadicFunc::RecordCreate {
4782 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4783 },
4784 vec![
4785 HirScalarExpr::column(key_column),
4786 HirScalarExpr::column(value_column),
4787 ],
4788 ))
4789 .chain(
4790 query
4791 .order_by
4792 .iter()
4793 .map(|co| HirScalarExpr::column(co.column)),
4794 )
4795 .collect();
4796
4797 let expr = query
4798 .expr
4799 .reduce(
4800 vec![],
4801 vec![AggregateExpr {
4802 func: AggregateFunc::MapAgg {
4803 order_by: query
4804 .order_by
4805 .into_iter()
4806 .enumerate()
4807 .map(|(i, order)| ColumnOrder { column: i, ..order })
4808 .collect(),
4809 value_type: value_type.clone(),
4810 },
4811 expr: Box::new(HirScalarExpr::call_variadic(
4812 VariadicFunc::RecordCreate {
4813 field_names: iter::repeat(ColumnName::from(""))
4814 .take(aggregation_exprs.len())
4815 .collect(),
4816 },
4817 aggregation_exprs,
4818 )),
4819 distinct: false,
4820 }],
4821 None,
4822 )
4823 .project(vec![0]);
4824
4825 let expr = HirScalarExpr::call_variadic(
4827 VariadicFunc::Coalesce,
4828 vec![
4829 expr.select(),
4830 HirScalarExpr::literal(
4831 Datum::empty_map(),
4832 SqlScalarType::Map {
4833 value_type: Box::new(value_type),
4834 custom_id: None,
4835 },
4836 ),
4837 ],
4838 );
4839
4840 Ok(expr.into())
4841}
4842
4843fn plan_collate(
4844 ecx: &ExprContext,
4845 expr: &Expr<Aug>,
4846 collation: &UnresolvedItemName,
4847) -> Result<CoercibleScalarExpr, PlanError> {
4848 if collation.0.len() == 2
4849 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4850 && collation.0[1] == ident!("default")
4851 {
4852 plan_expr(ecx, expr)
4853 } else {
4854 bail_unsupported!("COLLATE");
4855 }
4856}
4857
4858fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4865where
4866 E: std::borrow::Borrow<Expr<Aug>>,
4867{
4868 let mut out = vec![];
4869 for expr in exprs {
4870 out.push(plan_expr(ecx, expr.borrow())?);
4871 }
4872 Ok(out)
4873}
4874
4875fn plan_array(
4877 ecx: &ExprContext,
4878 exprs: &[Expr<Aug>],
4879 type_hint: Option<&SqlScalarType>,
4880) -> Result<CoercibleScalarExpr, PlanError> {
4881 let mut out = vec![];
4883 for expr in exprs {
4884 out.push(match expr {
4885 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4888 _ => plan_expr(ecx, expr)?,
4889 });
4890 }
4891
4892 let type_hint = match type_hint {
4894 Some(SqlScalarType::Array(elem_type)) => {
4899 let multidimensional = out.iter().any(|e| {
4900 matches!(
4901 ecx.scalar_type(e),
4902 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4903 )
4904 });
4905 if multidimensional {
4906 type_hint
4907 } else {
4908 Some(&**elem_type)
4909 }
4910 }
4911 Some(_) => None,
4915 None => None,
4917 };
4918
4919 let (elem_type, exprs) = if exprs.is_empty() {
4921 if let Some(elem_type) = type_hint {
4922 (elem_type.clone(), vec![])
4923 } else {
4924 sql_bail!("cannot determine type of empty array");
4925 }
4926 } else {
4927 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4928 (ecx.scalar_type(&out[0]), out)
4929 };
4930
4931 if matches!(
4937 elem_type,
4938 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4939 ) {
4940 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4941 }
4942
4943 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4944}
4945
4946fn plan_list(
4947 ecx: &ExprContext,
4948 exprs: &[Expr<Aug>],
4949 type_hint: Option<&SqlScalarType>,
4950) -> Result<CoercibleScalarExpr, PlanError> {
4951 let (elem_type, exprs) = if exprs.is_empty() {
4952 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4953 (element_type.without_modifiers(), vec![])
4954 } else {
4955 sql_bail!("cannot determine type of empty list");
4956 }
4957 } else {
4958 let type_hint = match type_hint {
4959 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4960 _ => None,
4961 };
4962
4963 let mut out = vec![];
4964 for expr in exprs {
4965 out.push(match expr {
4966 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4969 _ => plan_expr(ecx, expr)?,
4970 });
4971 }
4972 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4973 (ecx.scalar_type(&out[0]).without_modifiers(), out)
4974 };
4975
4976 if matches!(elem_type, SqlScalarType::Char { .. }) {
4977 bail_unsupported!("char list");
4978 }
4979
4980 Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4981}
4982
4983fn plan_map(
4984 ecx: &ExprContext,
4985 entries: &[MapEntry<Aug>],
4986 type_hint: Option<&SqlScalarType>,
4987) -> Result<CoercibleScalarExpr, PlanError> {
4988 let (value_type, exprs) = if entries.is_empty() {
4989 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4990 (value_type.without_modifiers(), vec![])
4991 } else {
4992 sql_bail!("cannot determine type of empty map");
4993 }
4994 } else {
4995 let type_hint = match type_hint {
4996 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
4997 _ => None,
4998 };
4999
5000 let mut keys = vec![];
5001 let mut values = vec![];
5002 for MapEntry { key, value } in entries {
5003 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5004 let value = match value {
5005 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5008 _ => plan_expr(ecx, value)?,
5009 };
5010 keys.push(key);
5011 values.push(value);
5012 }
5013 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5014 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5015 let out = itertools::interleave(keys, values).collect();
5016 (value_type, out)
5017 };
5018
5019 if matches!(value_type, SqlScalarType::Char { .. }) {
5020 bail_unsupported!("char map");
5021 }
5022
5023 let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5024 Ok(expr.into())
5025}
5026
5027pub fn coerce_homogeneous_exprs(
5044 ecx: &ExprContext,
5045 exprs: Vec<CoercibleScalarExpr>,
5046 force_type: Option<&SqlScalarType>,
5047) -> Result<Vec<HirScalarExpr>, PlanError> {
5048 assert!(!exprs.is_empty());
5049
5050 let target_holder;
5051 let target = match force_type {
5052 Some(t) => t,
5053 None => {
5054 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5055 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5056 &target_holder
5057 }
5058 };
5059
5060 let mut out = Vec::new();
5062 for expr in exprs {
5063 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5064 let ccx = match force_type {
5065 None => CastContext::Implicit,
5066 Some(_) => CastContext::Explicit,
5067 };
5068 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5069 Ok(expr) => out.push(expr),
5070 Err(_) => sql_bail!(
5071 "{} could not convert type {} to {}",
5072 ecx.name,
5073 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5074 ecx.humanize_scalar_type(target, false),
5075 ),
5076 }
5077 }
5078 Ok(out)
5079}
5080
5081pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5084 obe: &OrderByExpr<T>,
5085 column: usize,
5086) -> ColumnOrder {
5087 let desc = !obe.asc.unwrap_or(true);
5088 ColumnOrder {
5089 column,
5090 desc,
5091 nulls_last: obe.nulls_last.unwrap_or(!desc),
5094 }
5095}
5096
5097fn plan_function_order_by(
5105 ecx: &ExprContext,
5106 order_by: &[OrderByExpr<Aug>],
5107) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5108 let mut order_by_exprs = vec![];
5109 let mut col_orders = vec![];
5110 {
5111 for (i, obe) in order_by.iter().enumerate() {
5112 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5116 order_by_exprs.push(expr);
5117 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5118 }
5119 }
5120 Ok((order_by_exprs, col_orders))
5121}
5122
5123fn plan_aggregate_common(
5125 ecx: &ExprContext,
5126 Function::<Aug> {
5127 name,
5128 args,
5129 filter,
5130 over: _,
5131 distinct,
5132 }: &Function<Aug>,
5133) -> Result<AggregateExpr, PlanError> {
5134 let impls = match resolve_func(ecx, name, args)? {
5149 Func::Aggregate(impls) => impls,
5150 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5151 };
5152
5153 let (args, order_by) = match &args {
5162 FunctionArgs::Star => (vec![], vec![]),
5163 FunctionArgs::Args { args, order_by } => {
5164 if args.is_empty() {
5165 sql_bail!(
5166 "{}(*) must be used to call a parameterless aggregate function",
5167 ecx.qcx
5168 .scx
5169 .humanize_resolved_name(name)
5170 .expect("name actually resolved")
5171 );
5172 }
5173 let args = plan_exprs(ecx, args)?;
5174 (args, order_by.clone())
5175 }
5176 };
5177
5178 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5179
5180 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5181 if let Some(filter) = &filter {
5182 let cond =
5192 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5193 let expr_typ = ecx.scalar_type(&expr);
5194 expr = HirScalarExpr::if_then_else(
5195 cond,
5196 expr,
5197 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5198 );
5199 }
5200
5201 let mut seen_outer = false;
5202 let mut seen_inner = false;
5203 #[allow(deprecated)]
5204 expr.visit_columns(0, &mut |depth, col| {
5205 if depth == 0 && col.level == 0 {
5206 seen_inner = true;
5207 } else if col.level > depth {
5208 seen_outer = true;
5209 }
5210 });
5211 if seen_outer && !seen_inner {
5212 bail_unsupported!(
5213 3720,
5214 "aggregate functions that refer exclusively to outer columns"
5215 );
5216 }
5217
5218 if func.is_order_sensitive() {
5221 let field_names = iter::repeat(ColumnName::from(""))
5222 .take(1 + order_by_exprs.len())
5223 .collect();
5224 let mut exprs = vec![expr];
5225 exprs.extend(order_by_exprs);
5226 expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5227 }
5228
5229 Ok(AggregateExpr {
5230 func,
5231 expr: Box::new(expr),
5232 distinct: *distinct,
5233 })
5234}
5235
5236fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5237 let mut names = names.to_vec();
5238 let col_name = normalize::column_name(names.pop().unwrap());
5239
5240 if !names.is_empty() {
5242 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5243 let (i, i_name) = ecx.scope.resolve_table_column(
5244 &ecx.qcx.outer_scopes,
5245 &table_name,
5246 &col_name,
5247 &mut ecx.qcx.name_manager.borrow_mut(),
5248 )?;
5249 return Ok(HirScalarExpr::named_column(i, i_name));
5250 }
5251
5252 let similar_names = match ecx.scope.resolve_column(
5255 &ecx.qcx.outer_scopes,
5256 &col_name,
5257 &mut ecx.qcx.name_manager.borrow_mut(),
5258 ) {
5259 Ok((i, i_name)) => {
5260 return Ok(HirScalarExpr::named_column(i, i_name));
5261 }
5262 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5263 Err(e) => return Err(e),
5264 };
5265
5266 let items = ecx.scope.items_from_table(
5269 &ecx.qcx.outer_scopes,
5270 &PartialItemName {
5271 database: None,
5272 schema: None,
5273 item: col_name.as_str().to_owned(),
5274 },
5275 )?;
5276 match items.as_slice() {
5277 [] => Err(PlanError::UnknownColumn {
5279 table: None,
5280 column: col_name,
5281 similar: similar_names,
5282 }),
5283 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5288 *column,
5289 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5290 )),
5291 _ => {
5294 let mut has_exists_column = None;
5295 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5296 .into_iter()
5297 .filter_map(|(column, item)| {
5298 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5299 has_exists_column = Some(column);
5300 None
5301 } else {
5302 let expr = HirScalarExpr::named_column(
5303 column,
5304 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5305 );
5306 let name = item.column_name.clone();
5307 Some((expr, name))
5308 }
5309 })
5310 .unzip();
5311 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5313 exprs.into_element()
5314 } else {
5315 HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5316 };
5317 if let Some(has_exists_column) = has_exists_column {
5318 Ok(HirScalarExpr::if_then_else(
5319 HirScalarExpr::unnamed_column(has_exists_column)
5320 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5321 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5322 expr,
5323 ))
5324 } else {
5325 Ok(expr)
5326 }
5327 }
5328 }
5329}
5330
5331fn plan_op(
5332 ecx: &ExprContext,
5333 op: &str,
5334 expr1: &Expr<Aug>,
5335 expr2: Option<&Expr<Aug>>,
5336) -> Result<HirScalarExpr, PlanError> {
5337 let impls = func::resolve_op(op)?;
5338 let args = match expr2 {
5339 None => plan_exprs(ecx, &[expr1])?,
5340 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5341 };
5342 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5343}
5344
5345fn plan_function<'a>(
5346 ecx: &ExprContext,
5347 f @ Function {
5348 name,
5349 args,
5350 filter,
5351 over,
5352 distinct,
5353 }: &'a Function<Aug>,
5354) -> Result<HirScalarExpr, PlanError> {
5355 let impls = match resolve_func(ecx, name, args)? {
5356 Func::Table(_) => {
5357 sql_bail!(
5358 "table functions are not allowed in {} (function {})",
5359 ecx.name,
5360 name
5361 );
5362 }
5363 Func::Scalar(impls) => {
5364 if over.is_some() {
5365 sql_bail!(
5366 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5367 );
5368 }
5369 impls
5370 }
5371 Func::ScalarWindow(impls) => {
5372 let (
5373 ignore_nulls,
5374 order_by_exprs,
5375 col_orders,
5376 _window_frame,
5377 partition_by,
5378 scalar_args,
5379 ) = plan_window_function_non_aggr(ecx, f)?;
5380
5381 if !scalar_args.is_empty() {
5385 if let ResolvedItemName::Item {
5386 full_name: FullItemName { item, .. },
5387 ..
5388 } = name
5389 {
5390 sql_bail!(
5391 "function {} has 0 parameters, but was called with {}",
5392 item,
5393 scalar_args.len()
5394 );
5395 }
5396 }
5397
5398 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5403
5404 if ignore_nulls {
5405 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5408 }
5409
5410 return Ok(HirScalarExpr::windowing(WindowExpr {
5411 func: WindowExprType::Scalar(ScalarWindowExpr {
5412 func,
5413 order_by: col_orders,
5414 }),
5415 partition_by,
5416 order_by: order_by_exprs,
5417 }));
5418 }
5419 Func::ValueWindow(impls) => {
5420 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5421 plan_window_function_non_aggr(ecx, f)?;
5422
5423 let (args_encoded, func) =
5424 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5425
5426 if ignore_nulls {
5427 match func {
5428 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5429 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5430 }
5431 }
5432
5433 return Ok(HirScalarExpr::windowing(WindowExpr {
5434 func: WindowExprType::Value(ValueWindowExpr {
5435 func,
5436 args: Box::new(args_encoded),
5437 order_by: col_orders,
5438 window_frame,
5439 ignore_nulls, }),
5441 partition_by,
5442 order_by: order_by_exprs,
5443 }));
5444 }
5445 Func::Aggregate(_) => {
5446 if f.over.is_none() {
5447 if ecx.allow_aggregates {
5449 sql_bail!(
5452 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5453 name,
5454 );
5455 } else {
5456 sql_bail!(
5459 "aggregate functions are not allowed in {} (function {})",
5460 ecx.name,
5461 name
5462 );
5463 }
5464 } else {
5465 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5466 plan_window_function_common(ecx, &f.name, &f.over)?;
5467
5468 match (&window_frame.start_bound, &window_frame.end_bound) {
5470 (
5471 mz_expr::WindowFrameBound::UnboundedPreceding,
5472 mz_expr::WindowFrameBound::OffsetPreceding(..),
5473 )
5474 | (
5475 mz_expr::WindowFrameBound::UnboundedPreceding,
5476 mz_expr::WindowFrameBound::OffsetFollowing(..),
5477 )
5478 | (
5479 mz_expr::WindowFrameBound::OffsetPreceding(..),
5480 mz_expr::WindowFrameBound::UnboundedFollowing,
5481 )
5482 | (
5483 mz_expr::WindowFrameBound::OffsetFollowing(..),
5484 mz_expr::WindowFrameBound::UnboundedFollowing,
5485 ) => bail_unsupported!("mixed unbounded - offset frames"),
5486 (_, _) => {} }
5488
5489 if ignore_nulls {
5490 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5494 }
5495
5496 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5497
5498 if aggregate_expr.distinct {
5499 bail_unsupported!("DISTINCT in window aggregates");
5501 }
5502
5503 return Ok(HirScalarExpr::windowing(WindowExpr {
5504 func: WindowExprType::Aggregate(AggregateWindowExpr {
5505 aggregate_expr,
5506 order_by: col_orders,
5507 window_frame,
5508 }),
5509 partition_by,
5510 order_by: order_by_exprs,
5511 }));
5512 }
5513 }
5514 };
5515
5516 if over.is_some() {
5517 unreachable!("If there is an OVER clause, we should have returned already above.");
5518 }
5519
5520 if *distinct {
5521 sql_bail!(
5522 "DISTINCT specified, but {} is not an aggregate function",
5523 ecx.qcx
5524 .scx
5525 .humanize_resolved_name(name)
5526 .expect("already resolved")
5527 );
5528 }
5529 if filter.is_some() {
5530 sql_bail!(
5531 "FILTER specified, but {} is not an aggregate function",
5532 ecx.qcx
5533 .scx
5534 .humanize_resolved_name(name)
5535 .expect("already resolved")
5536 );
5537 }
5538
5539 let scalar_args = match &args {
5540 FunctionArgs::Star => {
5541 sql_bail!(
5542 "* argument is invalid with non-aggregate function {}",
5543 ecx.qcx
5544 .scx
5545 .humanize_resolved_name(name)
5546 .expect("already resolved")
5547 )
5548 }
5549 FunctionArgs::Args { args, order_by } => {
5550 if !order_by.is_empty() {
5551 sql_bail!(
5552 "ORDER BY specified, but {} is not an aggregate function",
5553 ecx.qcx
5554 .scx
5555 .humanize_resolved_name(name)
5556 .expect("already resolved")
5557 );
5558 }
5559 plan_exprs(ecx, args)?
5560 }
5561 };
5562
5563 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5564}
5565
5566pub const IGNORE_NULLS_ERROR_MSG: &str =
5567 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5568
5569pub fn resolve_func(
5573 ecx: &ExprContext,
5574 name: &ResolvedItemName,
5575 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5576) -> Result<&'static Func, PlanError> {
5577 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5578 if let Ok(f) = i.func() {
5579 return Ok(f);
5580 }
5581 }
5582
5583 let cexprs = match args {
5586 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5587 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5588 if !order_by.is_empty() {
5589 sql_bail!(
5590 "ORDER BY specified, but {} is not an aggregate function",
5591 name
5592 );
5593 }
5594 plan_exprs(ecx, args)?
5595 }
5596 };
5597
5598 let arg_types: Vec<_> = cexprs
5599 .into_iter()
5600 .map(|ty| match ecx.scalar_type(&ty) {
5601 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5602 CoercibleScalarType::Record(_) => "record".to_string(),
5603 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5604 })
5605 .collect();
5606
5607 Err(PlanError::UnknownFunction {
5608 name: name.to_string(),
5609 arg_types,
5610 })
5611}
5612
5613fn plan_is_expr<'a>(
5614 ecx: &ExprContext,
5615 expr: &'a Expr<Aug>,
5616 construct: &IsExprConstruct<Aug>,
5617 not: bool,
5618) -> Result<HirScalarExpr, PlanError> {
5619 let expr = plan_expr(ecx, expr)?;
5620 let mut expr = match construct {
5621 IsExprConstruct::Null => {
5622 let expr = expr.type_as_any(ecx)?;
5627 expr.call_is_null()
5628 }
5629 IsExprConstruct::Unknown => {
5630 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5631 expr.call_is_null()
5632 }
5633 IsExprConstruct::True => {
5634 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5635 expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5636 }
5637 IsExprConstruct::False => {
5638 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5639 expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5640 }
5641 IsExprConstruct::DistinctFrom(expr2) => {
5642 let expr1 = expr.type_as_any(ecx)?;
5643 let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5644 let term1 = HirScalarExpr::variadic_or(vec![
5651 expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5652 expr1.clone().call_is_null(),
5653 expr2.clone().call_is_null(),
5654 ]);
5655 let term2 = HirScalarExpr::variadic_or(vec![
5656 expr1.call_is_null().not(),
5657 expr2.call_is_null().not(),
5658 ]);
5659 term1.and(term2)
5660 }
5661 };
5662 if not {
5663 expr = expr.not();
5664 }
5665 Ok(expr)
5666}
5667
5668fn plan_case<'a>(
5669 ecx: &ExprContext,
5670 operand: &'a Option<Box<Expr<Aug>>>,
5671 conditions: &'a [Expr<Aug>],
5672 results: &'a [Expr<Aug>],
5673 else_result: &'a Option<Box<Expr<Aug>>>,
5674) -> Result<HirScalarExpr, PlanError> {
5675 let mut cond_exprs = Vec::new();
5676 let mut result_exprs = Vec::new();
5677 for (c, r) in conditions.iter().zip_eq(results) {
5678 let c = match operand {
5679 Some(operand) => operand.clone().equals(c.clone()),
5680 None => c.clone(),
5681 };
5682 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5683 cond_exprs.push(cexpr);
5684 result_exprs.push(r);
5685 }
5686 result_exprs.push(match else_result {
5687 Some(else_result) => else_result,
5688 None => &Expr::Value(Value::Null),
5689 });
5690 let mut result_exprs = coerce_homogeneous_exprs(
5691 &ecx.with_name("CASE"),
5692 plan_exprs(ecx, &result_exprs)?,
5693 None,
5694 )?;
5695 let mut expr = result_exprs.pop().unwrap();
5696 assert_eq!(cond_exprs.len(), result_exprs.len());
5697 for (cexpr, rexpr) in cond_exprs
5698 .into_iter()
5699 .rev()
5700 .zip_eq(result_exprs.into_iter().rev())
5701 {
5702 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5703 }
5704 Ok(expr)
5705}
5706
5707fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5708 let (datum, scalar_type) = match l {
5709 Value::Number(s) => {
5710 let d = strconv::parse_numeric(s.as_str())?;
5711 if !s.contains(&['E', '.'][..]) {
5712 if let Ok(n) = d.0.try_into() {
5714 (Datum::Int32(n), SqlScalarType::Int32)
5715 } else if let Ok(n) = d.0.try_into() {
5716 (Datum::Int64(n), SqlScalarType::Int64)
5717 } else {
5718 (
5719 Datum::Numeric(d),
5720 SqlScalarType::Numeric { max_scale: None },
5721 )
5722 }
5723 } else {
5724 (
5725 Datum::Numeric(d),
5726 SqlScalarType::Numeric { max_scale: None },
5727 )
5728 }
5729 }
5730 Value::HexString(_) => bail_unsupported!("hex string literals"),
5731 Value::Boolean(b) => match b {
5732 false => (Datum::False, SqlScalarType::Bool),
5733 true => (Datum::True, SqlScalarType::Bool),
5734 },
5735 Value::Interval(i) => {
5736 let i = literal::plan_interval(i)?;
5737 (Datum::Interval(i), SqlScalarType::Interval)
5738 }
5739 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5740 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5741 };
5742 let expr = HirScalarExpr::literal(datum, scalar_type);
5743 Ok(expr.into())
5744}
5745
5746fn plan_window_function_non_aggr<'a>(
5749 ecx: &ExprContext,
5750 Function {
5751 name,
5752 args,
5753 filter,
5754 over,
5755 distinct,
5756 }: &'a Function<Aug>,
5757) -> Result<
5758 (
5759 bool,
5760 Vec<HirScalarExpr>,
5761 Vec<ColumnOrder>,
5762 mz_expr::WindowFrame,
5763 Vec<HirScalarExpr>,
5764 Vec<CoercibleScalarExpr>,
5765 ),
5766 PlanError,
5767> {
5768 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5769 plan_window_function_common(ecx, name, over)?;
5770
5771 if *distinct {
5772 sql_bail!(
5773 "DISTINCT specified, but {} is not an aggregate function",
5774 name
5775 );
5776 }
5777
5778 if filter.is_some() {
5779 bail_unsupported!("FILTER in non-aggregate window functions");
5780 }
5781
5782 let scalar_args = match &args {
5783 FunctionArgs::Star => {
5784 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5785 }
5786 FunctionArgs::Args { args, order_by } => {
5787 if !order_by.is_empty() {
5788 sql_bail!(
5789 "ORDER BY specified, but {} is not an aggregate function",
5790 name
5791 );
5792 }
5793 plan_exprs(ecx, args)?
5794 }
5795 };
5796
5797 Ok((
5798 ignore_nulls,
5799 order_by_exprs,
5800 col_orders,
5801 window_frame,
5802 partition,
5803 scalar_args,
5804 ))
5805}
5806
5807fn plan_window_function_common(
5809 ecx: &ExprContext,
5810 name: &<Aug as AstInfo>::ItemName,
5811 over: &Option<WindowSpec<Aug>>,
5812) -> Result<
5813 (
5814 bool,
5815 Vec<HirScalarExpr>,
5816 Vec<ColumnOrder>,
5817 mz_expr::WindowFrame,
5818 Vec<HirScalarExpr>,
5819 ),
5820 PlanError,
5821> {
5822 if !ecx.allow_windows {
5823 sql_bail!(
5824 "window functions are not allowed in {} (function {})",
5825 ecx.name,
5826 name
5827 );
5828 }
5829
5830 let window_spec = match over.as_ref() {
5831 Some(over) => over,
5832 None => sql_bail!("window function {} requires an OVER clause", name),
5833 };
5834 if window_spec.ignore_nulls && window_spec.respect_nulls {
5835 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5836 }
5837 let window_frame = match window_spec.window_frame.as_ref() {
5838 Some(frame) => plan_window_frame(frame)?,
5839 None => mz_expr::WindowFrame::default(),
5840 };
5841 let mut partition = Vec::new();
5842 for expr in &window_spec.partition_by {
5843 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5844 }
5845
5846 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5847
5848 Ok((
5849 window_spec.ignore_nulls,
5850 order_by_exprs,
5851 col_orders,
5852 window_frame,
5853 partition,
5854 ))
5855}
5856
5857fn plan_window_frame(
5858 WindowFrame {
5859 units,
5860 start_bound,
5861 end_bound,
5862 }: &WindowFrame,
5863) -> Result<mz_expr::WindowFrame, PlanError> {
5864 use mz_expr::WindowFrameBound::*;
5865 let units = window_frame_unit_ast_to_expr(units)?;
5866 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5867 let end_bound = end_bound
5868 .as_ref()
5869 .map(window_frame_bound_ast_to_expr)
5870 .unwrap_or(CurrentRow);
5871
5872 match (&start_bound, &end_bound) {
5874 (UnboundedFollowing, _) => {
5876 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5877 }
5878 (_, UnboundedPreceding) => {
5880 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5881 }
5882 (CurrentRow, OffsetPreceding(_)) => {
5884 sql_bail!("frame starting from current row cannot have preceding rows")
5885 }
5886 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5887 sql_bail!("frame starting from following row cannot have preceding rows")
5888 }
5889 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5892 if *o1 > 1000000 || *o2 > 1000000 {
5896 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5897 }
5898 }
5899 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5900 if *o1 > 1000000 || *o2 > 1000000 {
5901 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5902 }
5903 }
5904 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5905 if *o1 > 1000000 || *o2 > 1000000 {
5906 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5907 }
5908 }
5909 (OffsetPreceding(o), CurrentRow) => {
5910 if *o > 1000000 {
5911 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5912 }
5913 }
5914 (CurrentRow, OffsetFollowing(o)) => {
5915 if *o > 1000000 {
5916 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5917 }
5918 }
5919 (_, _) => (),
5921 }
5922
5923 if units == mz_expr::WindowFrameUnits::Range
5926 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5927 {
5928 bail_unsupported!("RANGE in non-default window frames")
5929 }
5930
5931 let frame = mz_expr::WindowFrame {
5932 units,
5933 start_bound,
5934 end_bound,
5935 };
5936 Ok(frame)
5937}
5938
5939fn window_frame_unit_ast_to_expr(
5940 unit: &WindowFrameUnits,
5941) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5942 match unit {
5943 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5944 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5945 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5946 }
5947}
5948
5949fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5950 match bound {
5951 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5952 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5953 WindowFrameBound::Preceding(Some(offset)) => {
5954 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5955 }
5956 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5957 WindowFrameBound::Following(Some(offset)) => {
5958 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5959 }
5960 }
5961}
5962
5963pub fn scalar_type_from_sql(
5964 scx: &StatementContext,
5965 data_type: &ResolvedDataType,
5966) -> Result<SqlScalarType, PlanError> {
5967 match data_type {
5968 ResolvedDataType::AnonymousList(elem_type) => {
5969 let elem_type = scalar_type_from_sql(scx, elem_type)?;
5970 if matches!(elem_type, SqlScalarType::Char { .. }) {
5971 bail_unsupported!("char list");
5972 }
5973 Ok(SqlScalarType::List {
5974 element_type: Box::new(elem_type),
5975 custom_id: None,
5976 })
5977 }
5978 ResolvedDataType::AnonymousMap {
5979 key_type,
5980 value_type,
5981 } => {
5982 match scalar_type_from_sql(scx, key_type)? {
5983 SqlScalarType::String => {}
5984 other => sql_bail!(
5985 "map key type must be {}, got {}",
5986 scx.humanize_scalar_type(&SqlScalarType::String, false),
5987 scx.humanize_scalar_type(&other, false)
5988 ),
5989 }
5990 Ok(SqlScalarType::Map {
5991 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5992 custom_id: None,
5993 })
5994 }
5995 ResolvedDataType::Named { id, modifiers, .. } => {
5996 scalar_type_from_catalog(scx.catalog, *id, modifiers)
5997 }
5998 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5999 }
6000}
6001
6002pub fn scalar_type_from_catalog(
6003 catalog: &dyn SessionCatalog,
6004 id: CatalogItemId,
6005 modifiers: &[i64],
6006) -> Result<SqlScalarType, PlanError> {
6007 let entry = catalog.get_item(&id);
6008 let type_details = match entry.type_details() {
6009 Some(type_details) => type_details,
6010 None => {
6011 sql_bail!(
6014 "internal error: {} does not refer to a type",
6015 catalog.resolve_full_name(entry.name()).to_string().quoted()
6016 );
6017 }
6018 };
6019 match &type_details.typ {
6020 CatalogType::Numeric => {
6021 let mut modifiers = modifiers.iter().fuse();
6022 let precision = match modifiers.next() {
6023 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6024 sql_bail!(
6025 "precision for type numeric must be between 1 and {}",
6026 NUMERIC_DATUM_MAX_PRECISION,
6027 );
6028 }
6029 Some(p) => Some(*p),
6030 None => None,
6031 };
6032 let scale = match modifiers.next() {
6033 Some(scale) => {
6034 if let Some(precision) = precision {
6035 if *scale > precision {
6036 sql_bail!(
6037 "scale for type numeric must be between 0 and precision {}",
6038 precision
6039 );
6040 }
6041 }
6042 Some(NumericMaxScale::try_from(*scale)?)
6043 }
6044 None => None,
6045 };
6046 if modifiers.next().is_some() {
6047 sql_bail!("type numeric supports at most two type modifiers");
6048 }
6049 Ok(SqlScalarType::Numeric { max_scale: scale })
6050 }
6051 CatalogType::Char => {
6052 let mut modifiers = modifiers.iter().fuse();
6053 let length = match modifiers.next() {
6054 Some(l) => Some(CharLength::try_from(*l)?),
6055 None => Some(CharLength::ONE),
6056 };
6057 if modifiers.next().is_some() {
6058 sql_bail!("type character supports at most one type modifier");
6059 }
6060 Ok(SqlScalarType::Char { length })
6061 }
6062 CatalogType::VarChar => {
6063 let mut modifiers = modifiers.iter().fuse();
6064 let length = match modifiers.next() {
6065 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6066 None => None,
6067 };
6068 if modifiers.next().is_some() {
6069 sql_bail!("type character varying supports at most one type modifier");
6070 }
6071 Ok(SqlScalarType::VarChar { max_length: length })
6072 }
6073 CatalogType::Timestamp => {
6074 let mut modifiers = modifiers.iter().fuse();
6075 let precision = match modifiers.next() {
6076 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6077 None => None,
6078 };
6079 if modifiers.next().is_some() {
6080 sql_bail!("type timestamp supports at most one type modifier");
6081 }
6082 Ok(SqlScalarType::Timestamp { precision })
6083 }
6084 CatalogType::TimestampTz => {
6085 let mut modifiers = modifiers.iter().fuse();
6086 let precision = match modifiers.next() {
6087 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6088 None => None,
6089 };
6090 if modifiers.next().is_some() {
6091 sql_bail!("type timestamp with time zone supports at most one type modifier");
6092 }
6093 Ok(SqlScalarType::TimestampTz { precision })
6094 }
6095 t => {
6096 if !modifiers.is_empty() {
6097 sql_bail!(
6098 "{} does not support type modifiers",
6099 catalog.resolve_full_name(entry.name()).to_string()
6100 );
6101 }
6102 match t {
6103 CatalogType::Array {
6104 element_reference: element_id,
6105 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6106 catalog,
6107 *element_id,
6108 modifiers,
6109 )?))),
6110 CatalogType::List {
6111 element_reference: element_id,
6112 element_modifiers,
6113 } => Ok(SqlScalarType::List {
6114 element_type: Box::new(scalar_type_from_catalog(
6115 catalog,
6116 *element_id,
6117 element_modifiers,
6118 )?),
6119 custom_id: Some(id),
6120 }),
6121 CatalogType::Map {
6122 key_reference: _,
6123 key_modifiers: _,
6124 value_reference: value_id,
6125 value_modifiers,
6126 } => Ok(SqlScalarType::Map {
6127 value_type: Box::new(scalar_type_from_catalog(
6128 catalog,
6129 *value_id,
6130 value_modifiers,
6131 )?),
6132 custom_id: Some(id),
6133 }),
6134 CatalogType::Range {
6135 element_reference: element_id,
6136 } => Ok(SqlScalarType::Range {
6137 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6138 }),
6139 CatalogType::Record { fields } => {
6140 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6141 .iter()
6142 .map(|f| {
6143 let scalar_type = scalar_type_from_catalog(
6144 catalog,
6145 f.type_reference,
6146 &f.type_modifiers,
6147 )?;
6148 Ok((
6149 f.name.clone(),
6150 SqlColumnType {
6151 scalar_type,
6152 nullable: true,
6153 },
6154 ))
6155 })
6156 .collect::<Result<Box<_>, PlanError>>()?;
6157 Ok(SqlScalarType::Record {
6158 fields: scalars,
6159 custom_id: Some(id),
6160 })
6161 }
6162 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6163 CatalogType::Bool => Ok(SqlScalarType::Bool),
6164 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6165 CatalogType::Date => Ok(SqlScalarType::Date),
6166 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6167 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6168 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6169 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6170 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6171 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6172 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6173 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6174 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6175 CatalogType::Interval => Ok(SqlScalarType::Interval),
6176 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6177 CatalogType::Oid => Ok(SqlScalarType::Oid),
6178 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6179 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6180 CatalogType::Pseudo => {
6181 sql_bail!(
6182 "cannot reference pseudo type {}",
6183 catalog.resolve_full_name(entry.name()).to_string()
6184 )
6185 }
6186 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6187 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6188 CatalogType::RegType => Ok(SqlScalarType::RegType),
6189 CatalogType::String => Ok(SqlScalarType::String),
6190 CatalogType::Time => Ok(SqlScalarType::Time),
6191 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6192 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6193 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6194 CatalogType::Numeric => unreachable!("handled above"),
6195 CatalogType::Char => unreachable!("handled above"),
6196 CatalogType::VarChar => unreachable!("handled above"),
6197 CatalogType::Timestamp => unreachable!("handled above"),
6198 CatalogType::TimestampTz => unreachable!("handled above"),
6199 }
6200 }
6201 }
6202}
6203
6204struct AggregateTableFuncVisitor<'a> {
6207 scx: &'a StatementContext<'a>,
6208 aggs: Vec<Function<Aug>>,
6209 within_aggregate: bool,
6210 tables: BTreeMap<Function<Aug>, String>,
6211 table_disallowed_context: Vec<&'static str>,
6212 in_select_item: bool,
6213 id_gen: IdGen,
6214 err: Option<PlanError>,
6215}
6216
6217impl<'a> AggregateTableFuncVisitor<'a> {
6218 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6219 AggregateTableFuncVisitor {
6220 scx,
6221 aggs: Vec::new(),
6222 within_aggregate: false,
6223 tables: BTreeMap::new(),
6224 table_disallowed_context: Vec::new(),
6225 in_select_item: false,
6226 id_gen: Default::default(),
6227 err: None,
6228 }
6229 }
6230
6231 fn into_result(
6232 self,
6233 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6234 match self.err {
6235 Some(err) => Err(err),
6236 None => {
6237 let mut seen = BTreeSet::new();
6240 let aggs = self
6241 .aggs
6242 .into_iter()
6243 .filter(move |agg| seen.insert(agg.clone()))
6244 .collect();
6245 Ok((aggs, self.tables))
6246 }
6247 }
6248 }
6249}
6250
6251impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6252 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6253 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6254 Ok(i) => i,
6255 Err(_) => return,
6257 };
6258
6259 match item.func() {
6260 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6263 if self.within_aggregate {
6264 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6265 return;
6266 }
6267 self.aggs.push(func.clone());
6268 let Function {
6269 name: _,
6270 args,
6271 filter,
6272 over: _,
6273 distinct: _,
6274 } = func;
6275 if let Some(filter) = filter {
6276 self.visit_expr_mut(filter);
6277 }
6278 let old_within_aggregate = self.within_aggregate;
6279 self.within_aggregate = true;
6280 self.table_disallowed_context
6281 .push("aggregate function calls");
6282
6283 self.visit_function_args_mut(args);
6284
6285 self.within_aggregate = old_within_aggregate;
6286 self.table_disallowed_context.pop();
6287 }
6288 Ok(Func::Table { .. }) => {
6289 self.table_disallowed_context.push("other table functions");
6290 visit_mut::visit_function_mut(self, func);
6291 self.table_disallowed_context.pop();
6292 }
6293 _ => visit_mut::visit_function_mut(self, func),
6294 }
6295 }
6296
6297 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6298 }
6300
6301 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6302 let (disallowed_context, func) = match expr {
6303 Expr::Case { .. } => (Some("CASE"), None),
6304 Expr::HomogenizingFunction {
6305 function: HomogenizingFunction::Coalesce,
6306 ..
6307 } => (Some("COALESCE"), None),
6308 Expr::Function(func) if self.in_select_item => {
6309 let mut table_func = None;
6312 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6313 if let Ok(Func::Table { .. }) = item.func() {
6314 if let Some(context) = self.table_disallowed_context.last() {
6315 self.err = Some(sql_err!(
6316 "table functions are not allowed in {} (function {})",
6317 context,
6318 func.name
6319 ));
6320 return;
6321 }
6322 table_func = Some(func.clone());
6323 }
6324 }
6325 (None, table_func)
6328 }
6329 _ => (None, None),
6330 };
6331 if let Some(func) = func {
6332 visit_mut::visit_expr_mut(self, expr);
6334 if let Function {
6336 name: _,
6337 args: _,
6338 filter: None,
6339 over: None,
6340 distinct: false,
6341 } = &func
6342 {
6343 let unique_id = self.id_gen.allocate_id();
6345 let id = self
6346 .tables
6347 .entry(func)
6348 .or_insert_with(|| format!("table_func_{unique_id}"));
6349 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6352 }
6353 }
6354 if let Some(context) = disallowed_context {
6355 self.table_disallowed_context.push(context);
6356 }
6357
6358 visit_mut::visit_expr_mut(self, expr);
6359
6360 if disallowed_context.is_some() {
6361 self.table_disallowed_context.pop();
6362 }
6363 }
6364
6365 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6366 let old = self.in_select_item;
6367 self.in_select_item = true;
6368 visit_mut::visit_select_item_mut(self, si);
6369 self.in_select_item = old;
6370 }
6371}
6372
6373#[derive(Default)]
6374struct WindowFuncCollector {
6375 window_funcs: Vec<Expr<Aug>>,
6376}
6377
6378impl WindowFuncCollector {
6379 fn into_result(self) -> Vec<Expr<Aug>> {
6380 let mut seen = BTreeSet::new();
6382 let window_funcs_dedupped = self
6383 .window_funcs
6384 .into_iter()
6385 .filter(move |expr| seen.insert(expr.clone()))
6386 .rev()
6389 .collect();
6390 window_funcs_dedupped
6391 }
6392}
6393
6394impl Visit<'_, Aug> for WindowFuncCollector {
6395 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6396 match expr {
6397 Expr::Function(func) => {
6398 if func.over.is_some() {
6399 self.window_funcs.push(expr.clone());
6400 }
6401 }
6402 _ => (),
6403 }
6404 visit::visit_expr(self, expr);
6405 }
6406
6407 fn visit_query(&mut self, _query: &Query<Aug>) {
6408 }
6410}
6411
6412#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6414pub enum QueryLifetime {
6415 OneShot,
6417 Index,
6419 MaterializedView,
6421 Subscribe,
6423 View,
6425 Source,
6427}
6428
6429impl QueryLifetime {
6430 pub fn is_one_shot(&self) -> bool {
6434 let result = match self {
6435 QueryLifetime::OneShot => true,
6436 QueryLifetime::Index => false,
6437 QueryLifetime::MaterializedView => false,
6438 QueryLifetime::Subscribe => false,
6439 QueryLifetime::View => false,
6440 QueryLifetime::Source => false,
6441 };
6442 assert_eq!(!result, self.is_maintained());
6443 result
6444 }
6445
6446 pub fn is_maintained(&self) -> bool {
6449 match self {
6450 QueryLifetime::OneShot => false,
6451 QueryLifetime::Index => true,
6452 QueryLifetime::MaterializedView => true,
6453 QueryLifetime::Subscribe => true,
6454 QueryLifetime::View => true,
6455 QueryLifetime::Source => true,
6456 }
6457 }
6458
6459 pub fn allow_show(&self) -> bool {
6461 match self {
6462 QueryLifetime::OneShot => true,
6463 QueryLifetime::Index => false,
6464 QueryLifetime::MaterializedView => false,
6465 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6467 QueryLifetime::Source => false,
6468 }
6469 }
6470}
6471
6472#[derive(Debug, Clone)]
6474pub struct CteDesc {
6475 pub name: String,
6476 pub desc: RelationDesc,
6477}
6478
6479#[derive(Debug, Clone)]
6481pub struct QueryContext<'a> {
6482 pub scx: &'a StatementContext<'a>,
6484 pub lifetime: QueryLifetime,
6486 pub outer_scopes: Vec<Scope>,
6488 pub outer_relation_types: Vec<SqlRelationType>,
6490 pub ctes: BTreeMap<LocalId, CteDesc>,
6492 pub name_manager: Rc<RefCell<NameManager>>,
6494 pub recursion_guard: RecursionGuard,
6495}
6496
6497impl CheckedRecursion for QueryContext<'_> {
6498 fn recursion_guard(&self) -> &RecursionGuard {
6499 &self.recursion_guard
6500 }
6501}
6502
6503impl<'a> QueryContext<'a> {
6504 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6505 QueryContext {
6506 scx,
6507 lifetime,
6508 outer_scopes: vec![],
6509 outer_relation_types: vec![],
6510 ctes: BTreeMap::new(),
6511 name_manager: Rc::new(RefCell::new(NameManager::new())),
6512 recursion_guard: RecursionGuard::with_limit(1024), }
6514 }
6515
6516 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6517 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6518 }
6519
6520 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6523 let ctes = self.ctes.clone();
6524 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6525 let outer_relation_types = iter::once(relation_type)
6526 .chain(self.outer_relation_types.clone())
6527 .collect();
6528 let name_manager = Rc::clone(&self.name_manager);
6530
6531 QueryContext {
6532 scx: self.scx,
6533 lifetime: self.lifetime,
6534 outer_scopes,
6535 outer_relation_types,
6536 ctes,
6537 name_manager,
6538 recursion_guard: self.recursion_guard.clone(),
6539 }
6540 }
6541
6542 fn empty_derived_context(&self) -> QueryContext<'a> {
6544 let scope = Scope::empty();
6545 let ty = SqlRelationType::empty();
6546 self.derived_context(scope, ty)
6547 }
6548
6549 pub fn resolve_table_name(
6552 &self,
6553 object: ResolvedItemName,
6554 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6555 match object {
6556 ResolvedItemName::Item {
6557 id,
6558 full_name,
6559 version,
6560 ..
6561 } => {
6562 let name = full_name.into();
6563 let item = self.scx.get_item(&id).at_version(version);
6564 let desc = item
6565 .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6566 .clone();
6567 let expr = HirRelationExpr::Get {
6568 id: Id::Global(item.global_id()),
6569 typ: desc.typ().clone(),
6570 };
6571
6572 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6573
6574 Ok((expr, scope))
6575 }
6576 ResolvedItemName::Cte { id, name } => {
6577 let name = name.into();
6578 let cte = self.ctes.get(&id).unwrap();
6579 let expr = HirRelationExpr::Get {
6580 id: Id::Local(id),
6581 typ: cte.desc.typ().clone(),
6582 };
6583
6584 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6585
6586 Ok((expr, scope))
6587 }
6588 ResolvedItemName::ContinualTask { id, name } => {
6589 let cte = self.ctes.get(&id).unwrap();
6590 let expr = HirRelationExpr::Get {
6591 id: Id::Local(id),
6592 typ: cte.desc.typ().clone(),
6593 };
6594
6595 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6596
6597 Ok((expr, scope))
6598 }
6599 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6600 }
6601 }
6602
6603 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6606 self.scx.humanize_scalar_type(typ, postgres_compat)
6607 }
6608}
6609
6610#[derive(Debug, Clone)]
6612pub struct ExprContext<'a> {
6613 pub qcx: &'a QueryContext<'a>,
6614 pub name: &'a str,
6616 pub scope: &'a Scope,
6619 pub relation_type: &'a SqlRelationType,
6622 pub allow_aggregates: bool,
6624 pub allow_subqueries: bool,
6626 pub allow_parameters: bool,
6628 pub allow_windows: bool,
6630}
6631
6632impl CheckedRecursion for ExprContext<'_> {
6633 fn recursion_guard(&self) -> &RecursionGuard {
6634 &self.qcx.recursion_guard
6635 }
6636}
6637
6638impl<'a> ExprContext<'a> {
6639 pub fn catalog(&self) -> &dyn SessionCatalog {
6640 self.qcx.scx.catalog
6641 }
6642
6643 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6644 let mut ecx = self.clone();
6645 ecx.name = name;
6646 ecx
6647 }
6648
6649 pub fn column_type<E>(&self, expr: &E) -> E::Type
6650 where
6651 E: AbstractExpr,
6652 {
6653 expr.typ(
6654 &self.qcx.outer_relation_types,
6655 self.relation_type,
6656 &self.qcx.scx.param_types.borrow(),
6657 )
6658 }
6659
6660 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6661 where
6662 E: AbstractExpr,
6663 {
6664 self.column_type(expr).scalar_type()
6665 }
6666
6667 fn derived_query_context(&self) -> QueryContext<'_> {
6668 let mut scope = self.scope.clone();
6669 scope.lateral_barrier = true;
6670 self.qcx.derived_context(scope, self.relation_type.clone())
6671 }
6672
6673 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6674 self.qcx.scx.require_feature_flag(flag)
6675 }
6676
6677 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6678 &self.qcx.scx.param_types
6679 }
6680
6681 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6684 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6685 }
6686
6687 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6688 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6689 }
6690}
6691
6692#[derive(Debug, Clone)]
6698pub struct NameManager(BTreeSet<Arc<str>>);
6699
6700impl NameManager {
6701 pub fn new() -> Self {
6703 Self(BTreeSet::new())
6704 }
6705
6706 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6709 let s = s.as_ref();
6710 if let Some(interned) = self.0.get(s) {
6711 Arc::clone(interned)
6712 } else {
6713 let interned: Arc<str> = Arc::from(s);
6714 self.0.insert(Arc::clone(&interned));
6715 interned
6716 }
6717 }
6718
6719 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6722 self.intern(item.column_name.as_str())
6738 }
6739}
6740
6741#[cfg(test)]
6742mod test {
6743 use super::*;
6744
6745 #[mz_ore::test]
6750 pub fn test_name_manager_string_interning() {
6751 let mut nm = NameManager::new();
6752
6753 let orig_hi = "hi";
6754 let hi = nm.intern(orig_hi);
6755 let hello = nm.intern("hello");
6756
6757 assert_ne!(hi.as_ptr(), hello.as_ptr());
6758
6759 let hi2 = nm.intern("hi");
6761 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6762
6763 let s = format!(
6765 "{}{}",
6766 hi.chars().nth(0).unwrap(),
6767 hi2.chars().nth(1).unwrap()
6768 );
6769 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6771
6772 let hi3 = nm.intern(s);
6773 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6774 }
6775}