1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52 func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67 RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103 literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111 pub expr: E,
112 pub desc: RelationDesc,
113 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114 pub scope: Scope,
115}
116
117#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127 scx: &StatementContext,
128 mut query: Query<Aug>,
129 lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131 transform_ast::transform(scx, &mut query)?;
132 let mut qcx = QueryContext::root(scx, lifetime);
133 let PlannedQuery {
134 mut expr,
135 scope,
136 order_by,
137 limit,
138 offset,
139 project,
140 group_size_hints,
141 } = plan_query(&mut qcx, &query)?;
142
143 let mut finishing = RowSetFinishing {
144 limit,
145 offset,
146 project,
147 order_by,
148 };
149
150 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157 if lifetime.is_maintained() {
158 expr.finish_maintained(&mut finishing, group_size_hints);
159 }
160
161 let typ = qcx.relation_type(&expr);
162 let typ = SqlRelationType::new(
163 finishing
164 .project
165 .iter()
166 .map(|i| typ.column_types[*i].clone())
167 .collect(),
168 );
169 let desc = RelationDesc::new(typ, scope.column_names());
170
171 Ok(PlannedRootQuery {
172 expr,
173 desc,
174 finishing,
175 scope,
176 })
177}
178
179#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182 qcx: &mut QueryContext,
183 mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185 transform_ast::transform(qcx.scx, &mut query)?;
186 let PlannedQuery {
187 mut expr,
188 scope,
189 order_by,
190 limit,
191 offset,
192 project,
193 group_size_hints,
194 } = plan_query(qcx, &query)?;
195
196 let mut finishing = RowSetFinishing {
197 limit,
198 offset,
199 project,
200 order_by,
201 };
202
203 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210 expr.finish_maintained(&mut finishing, group_size_hints);
211
212 let typ = qcx.relation_type(&expr);
213 let typ = SqlRelationType::new(
214 finishing
215 .project
216 .iter()
217 .map(|i| typ.column_types[*i].clone())
218 .collect(),
219 );
220 let desc = RelationDesc::new(typ, scope.column_names());
221
222 Ok(PlannedRootQuery {
223 expr,
224 desc,
225 finishing,
226 scope,
227 })
228}
229
230fn try_push_projection_order_by(
241 expr: &mut HirRelationExpr,
242 project: &mut Vec<usize>,
243 order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245 let mut unproject = vec![None; expr.arity()];
246 for (out_i, in_i) in project.iter().copied().enumerate() {
247 unproject[in_i] = Some(out_i);
248 }
249 if order_by
250 .iter()
251 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252 {
253 let trivial_project = (0..project.len()).collect();
254 *expr = expr.take().project(mem::replace(project, trivial_project));
255 for ob in order_by {
256 ob.column = unproject[ob.column].unwrap();
257 }
258 true
259 } else {
260 false
261 }
262}
263
264pub fn plan_insert_query(
265 scx: &StatementContext,
266 table_name: ResolvedItemName,
267 columns: Vec<Ident>,
268 source: InsertSource<Aug>,
269 returning: Vec<SelectItem<Aug>>,
270) -> Result<
271 (
272 CatalogItemId,
273 HirRelationExpr,
274 PlannedRootQuery<Vec<HirScalarExpr>>,
275 ),
276 PlanError,
277> {
278 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279 let table = scx.get_item_by_resolved_name(&table_name)?;
280
281 if table.item_type() != CatalogItemType::Table {
283 sql_bail!(
284 "cannot insert into {} '{}'",
285 table.item_type(),
286 table_name.full_name_str()
287 );
288 }
289 let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290 let mut defaults = table
291 .writable_table_details()
292 .ok_or_else(|| {
293 sql_err!(
294 "cannot insert into non-writeable table '{}'",
295 table_name.full_name_str()
296 )
297 })?
298 .to_vec();
299
300 for default in &mut defaults {
301 transform_ast::transform(scx, default)?;
302 }
303
304 if table.id().is_system() {
305 sql_bail!(
306 "cannot insert into system table '{}'",
307 table_name.full_name_str()
308 );
309 }
310
311 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313 let mut source_types = Vec::with_capacity(columns.len());
315 let mut ordering = Vec::with_capacity(columns.len());
316
317 if columns.is_empty() {
318 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321 ordering.extend(0..desc.arity());
322 } else {
323 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324 .iter()
325 .enumerate()
326 .map(|(idx, (name, typ))| (name, (idx, typ)))
327 .collect();
328
329 for c in &columns {
330 if let Some((idx, typ)) = column_by_name.get(c) {
331 ordering.push(*idx);
332 source_types.push(&typ.scalar_type);
333 } else {
334 sql_bail!(
335 "column {} of relation {} does not exist",
336 c.quoted(),
337 table_name.full_name_str().quoted()
338 );
339 }
340 }
341 if let Some(dup) = columns.iter().duplicates().next() {
342 sql_bail!("column {} specified more than once", dup.quoted());
343 }
344 };
345
346 let expr = match source {
348 InsertSource::Query(mut query) => {
349 transform_ast::transform(scx, &mut query)?;
350
351 match query {
352 Query {
354 body: SetExpr::Values(Values(values)),
355 ctes,
356 order_by,
357 limit: None,
358 offset: None,
359 } if ctes.is_empty() && order_by.is_empty() => {
360 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361 plan_values_insert(&qcx, &names, &source_types, &values)?
362 }
363 _ => {
364 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365 expr
366 }
367 }
368 }
369 InsertSource::DefaultValues => {
370 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371 }
372 };
373
374 let expr_arity = expr.arity();
375
376 let max_columns = if columns.is_empty() {
379 desc.arity()
380 } else {
381 columns.len()
382 };
383 if expr_arity > max_columns {
384 sql_bail!("INSERT has more expressions than target columns");
385 }
386 if expr_arity < columns.len() {
388 sql_bail!("INSERT has more target columns than expressions");
389 }
390
391 source_types.truncate(expr_arity);
393 ordering.truncate(expr_arity);
394
395 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398 sql_err!(
399 "column {} is of type {} but expression is of type {}",
400 desc.get_name(ordering[e.column]).quoted(),
401 qcx.humanize_scalar_type(&e.target_type, false),
402 qcx.humanize_scalar_type(&e.source_type, false),
403 )
404 })?;
405
406 let mut map_exprs = vec![];
408 let mut project_key = Vec::with_capacity(desc.arity());
409
410 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414 for (col_idx, (col_typ, default)) in column_details {
415 if let Some(src_idx) = col_to_source.get(&col_idx) {
416 project_key.push(*src_idx);
417 } else {
418 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419 project_key.push(expr_arity + map_exprs.len());
420 map_exprs.push(hir);
421 }
422 }
423
424 let returning = {
425 let (scope, typ) = if let ResolvedItemName::Item {
426 full_name,
427 version: _,
428 ..
429 } = table_name
430 {
431 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432 let typ = desc.typ().clone();
433 (scope, typ)
434 } else {
435 (Scope::empty(), SqlRelationType::empty())
436 };
437 let ecx = &ExprContext {
438 qcx: &qcx,
439 name: "RETURNING clause",
440 scope: &scope,
441 relation_type: &typ,
442 allow_aggregates: false,
443 allow_subqueries: false,
444 allow_parameters: true,
445 allow_windows: false,
446 };
447 let table_func_names = BTreeMap::new();
448 let mut output_columns = vec![];
449 let mut new_exprs = vec![];
450 let mut new_type = SqlRelationType::empty();
451 for mut si in returning {
452 transform_ast::transform(scx, &mut si)?;
453 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454 let expr = match &select_item {
455 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457 };
458 output_columns.push(column_name);
459 let typ = ecx.column_type(&expr);
460 new_type.column_types.push(typ);
461 new_exprs.push(expr);
462 }
463 }
464 let desc = RelationDesc::new(new_type, output_columns);
465 let desc_arity = desc.arity();
466 PlannedRootQuery {
467 expr: new_exprs,
468 desc,
469 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = item.desc(&fullname)?.into_owned();
507 let mut ordering = Vec::with_capacity(columns.len());
508
509 let mfp = if let Some(table_defaults) = item.writable_table_details() {
519 let mut table_defaults = table_defaults.to_vec();
520
521 for default in &mut table_defaults {
522 transform_ast::transform(scx, default)?;
523 }
524
525 let source_column_names: Vec<_> = columns
527 .iter()
528 .cloned()
529 .map(normalize::column_name)
530 .collect();
531
532 let mut default_exprs = Vec::new();
533 let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535 let column_details = table_desc.iter().zip_eq(table_defaults);
538 for ((col_name, col_type), col_default) in column_details {
539 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540 if let Some(src_idx) = maybe_src_idx {
541 project_keys.push(src_idx);
542 } else {
543 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546 let mir = hir.lower_uncorrelated()?;
547 project_keys.push(source_column_names.len() + default_exprs.len());
548 default_exprs.push(mir);
549 }
550 }
551
552 let mfp = MapFilterProject::new(source_column_names.len())
553 .map(default_exprs)
554 .project(project_keys);
555 Some(mfp)
556 } else {
557 None
558 };
559
560 let source_desc = if columns.is_empty() {
562 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563 ordering.extend(indexes);
564
565 table_desc
567 } else {
568 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570 .iter_all()
571 .map(|(idx, name, typ)| (name, (*idx, typ)))
572 .collect();
573
574 let mut names = Vec::with_capacity(columns.len());
575 let mut source_types = Vec::with_capacity(columns.len());
576
577 for c in &columns {
578 if let Some((idx, typ)) = column_by_name.get(c) {
579 ordering.push(*idx);
580 source_types.push((*typ).clone());
581 names.push(c.clone());
582 } else {
583 sql_bail!(
584 "column {} of relation {} does not exist",
585 c.quoted(),
586 item_name.full_name_str().quoted()
587 );
588 }
589 }
590 if let Some(dup) = columns.iter().duplicates().next() {
591 sql_bail!("column {} specified more than once", dup.quoted());
592 }
593
594 RelationDesc::new(SqlRelationType::new(source_types), names)
596 };
597
598 Ok((item.id(), source_desc, ordering, mfp))
599}
600
601pub fn plan_copy_from(
605 scx: &StatementContext,
606 table_name: ResolvedItemName,
607 columns: Vec<Ident>,
608) -> Result<
609 (
610 CatalogItemId,
611 RelationDesc,
612 Vec<ColumnIndex>,
613 Option<MapFilterProject>,
614 ),
615 PlanError,
616> {
617 let table = scx.get_item_by_resolved_name(&table_name)?;
618
619 if table.item_type() != CatalogItemType::Table {
621 sql_bail!(
622 "cannot insert into {} '{}'",
623 table.item_type(),
624 table_name.full_name_str()
625 );
626 }
627
628 let _ = table.writable_table_details().ok_or_else(|| {
629 sql_err!(
630 "cannot insert into non-writeable table '{}'",
631 table_name.full_name_str()
632 )
633 })?;
634
635 if table.id().is_system() {
636 sql_bail!(
637 "cannot insert into system table '{}'",
638 table_name.full_name_str()
639 );
640 }
641 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643 Ok((id, desc, ordering, mfp))
644}
645
646pub fn plan_copy_from_rows(
649 pcx: &PlanContext,
650 catalog: &dyn SessionCatalog,
651 target_id: CatalogItemId,
652 target_name: String,
653 columns: Vec<ColumnIndex>,
654 rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656 let scx = StatementContext::new(Some(pcx), catalog);
657
658 let table = catalog
660 .try_get_item(&target_id)
661 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662 .at_version(RelationVersionSelector::Latest);
663 let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665 let mut defaults = table
666 .writable_table_details()
667 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668 .to_vec();
669
670 for default in &mut defaults {
671 transform_ast::transform(&scx, default)?;
672 }
673
674 let column_types = columns
675 .iter()
676 .map(|x| desc.get_type(x).clone())
677 .map(|mut x| {
678 x.nullable = true;
681 x
682 })
683 .collect();
684 let typ = SqlRelationType::new(column_types);
685 let expr = HirRelationExpr::Constant {
686 rows,
687 typ: typ.clone(),
688 };
689
690 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696 if columns == default {
697 return Ok(expr);
698 }
699
700 let mut map_exprs = vec![];
702 let mut project_key = Vec::with_capacity(desc.arity());
703
704 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707 let column_details = desc.iter_all().zip_eq(defaults);
708 for ((col_idx, _col_name, col_typ), default) in column_details {
709 if let Some(src_idx) = col_to_source.get(&col_idx) {
710 project_key.push(*src_idx);
711 } else {
712 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713 project_key.push(typ.arity() + map_exprs.len());
714 map_exprs.push(hir);
715 }
716 }
717
718 Ok(expr.map(map_exprs).project(project_key))
719}
720
721pub struct ReadThenWritePlan {
723 pub id: CatalogItemId,
724 pub selection: HirRelationExpr,
729 pub assignments: BTreeMap<usize, HirScalarExpr>,
731 pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735 scx: &StatementContext,
736 mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738 transform_ast::transform(scx, &mut delete_stmt)?;
739
740 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741 plan_mutation_query_inner(
742 qcx,
743 delete_stmt.table_name,
744 delete_stmt.alias,
745 delete_stmt.using,
746 vec![],
747 delete_stmt.selection,
748 )
749}
750
751pub fn plan_update_query(
752 scx: &StatementContext,
753 mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755 transform_ast::transform(scx, &mut update_stmt)?;
756
757 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759 plan_mutation_query_inner(
760 qcx,
761 update_stmt.table_name,
762 update_stmt.alias,
763 vec![],
764 update_stmt.assignments,
765 update_stmt.selection,
766 )
767}
768
769pub fn plan_mutation_query_inner(
770 qcx: QueryContext,
771 table_name: ResolvedItemName,
772 alias: Option<TableAlias>,
773 using: Vec<TableWithJoins<Aug>>,
774 assignments: Vec<Assignment<Aug>>,
775 selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777 let (id, version) = match table_name {
779 ResolvedItemName::Item { id, version, .. } => (id, version),
780 _ => sql_bail!("cannot mutate non-user table"),
781 };
782
783 let item = qcx.scx.get_item(&id).at_version(version);
785 if item.item_type() != CatalogItemType::Table {
786 sql_bail!(
787 "cannot mutate {} '{}'",
788 item.item_type(),
789 table_name.full_name_str()
790 );
791 }
792 let _ = item.writable_table_details().ok_or_else(|| {
793 sql_err!(
794 "cannot mutate non-writeable table '{}'",
795 table_name.full_name_str()
796 )
797 })?;
798 if id.is_system() {
799 sql_bail!(
800 "cannot mutate system table '{}'",
801 table_name.full_name_str()
802 );
803 }
804
805 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807 let scope = plan_table_alias(scope, alias.as_ref())?;
808 let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809 let relation_type = qcx.relation_type(&get);
810
811 if using.is_empty() {
812 if let Some(expr) = selection {
813 let ecx = &ExprContext {
814 qcx: &qcx,
815 name: "WHERE clause",
816 scope: &scope,
817 relation_type: &relation_type,
818 allow_aggregates: false,
819 allow_subqueries: true,
820 allow_parameters: true,
821 allow_windows: false,
822 };
823 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824 get = get.filter(vec![expr]);
825 }
826 } else {
827 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828 }
829
830 let mut sets = BTreeMap::new();
831 for Assignment { id, value } in assignments {
832 let name = normalize::column_name(id);
834 match desc.get_by_name(&name) {
835 Some((idx, typ)) => {
836 let ecx = &ExprContext {
837 qcx: &qcx,
838 name: "SET clause",
839 scope: &scope,
840 relation_type: &relation_type,
841 allow_aggregates: false,
842 allow_subqueries: false,
843 allow_parameters: true,
844 allow_windows: false,
845 };
846 let expr = plan_expr(ecx, &value)?.cast_to(
847 ecx,
848 CastContext::Assignment,
849 &typ.scalar_type,
850 )?;
851
852 if sets.insert(idx, expr).is_some() {
853 sql_bail!("column {} set twice", name)
854 }
855 }
856 None => sql_bail!("unknown column {}", name),
857 };
858 }
859
860 let finishing = RowSetFinishing {
861 order_by: vec![],
862 limit: None,
863 offset: 0,
864 project: (0..desc.arity()).collect(),
865 };
866
867 Ok(ReadThenWritePlan {
868 id,
869 selection: get,
870 finishing,
871 assignments: sets,
872 })
873}
874
875fn handle_mutation_using_clause(
887 qcx: &QueryContext,
888 selection: Option<Expr<Aug>>,
889 using: Vec<TableWithJoins<Aug>>,
890 get: HirRelationExpr,
891 outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893 let (mut using_rel_expr, using_scope) =
897 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898 let (left, left_scope) = l;
899 plan_join(
900 qcx,
901 left,
902 left_scope,
903 &Join {
904 relation: TableFactor::NestedJoin {
905 join: Box::new(twj),
906 alias: None,
907 },
908 join_operator: JoinOperator::CrossJoin,
909 },
910 )
911 })?;
912
913 if let Some(expr) = selection {
914 let on = HirScalarExpr::literal_true();
920 let joined = using_rel_expr
921 .clone()
922 .join(get.clone(), on, JoinKind::Inner);
923 let joined_scope = using_scope.product(outer_scope)?;
924 let joined_relation_type = qcx.relation_type(&joined);
925
926 let ecx = &ExprContext {
927 qcx,
928 name: "WHERE clause",
929 scope: &joined_scope,
930 relation_type: &joined_relation_type,
931 allow_aggregates: false,
932 allow_subqueries: true,
933 allow_parameters: true,
934 allow_windows: false,
935 };
936
937 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944 use mz_expr::visit::Visit;
946 expr.visit_mut_post(&mut |e| {
947 if let HirScalarExpr::Column(c, _name) = e {
948 if c.column >= using_rel_arity {
949 c.level += 1;
950 c.column -= using_rel_arity;
951 };
952 }
953 })?;
954
955 using_rel_expr = using_rel_expr.filter(vec![expr]);
959 } else {
960 let _joined_scope = using_scope.product(outer_scope)?;
963 }
964 Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979 pub(crate) column: usize,
980 pub(crate) source_type: SqlScalarType,
981 pub(crate) target_type: SqlScalarType,
982}
983
984pub(crate) fn cast_relation<'a, I>(
988 qcx: &QueryContext,
989 ccx: CastContext,
990 expr: HirRelationExpr,
991 target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994 I: IntoIterator<Item = &'a SqlScalarType>,
995{
996 let ecx = &ExprContext {
997 qcx,
998 name: "values",
999 scope: &Scope::empty(),
1000 relation_type: &qcx.relation_type(&expr),
1001 allow_aggregates: false,
1002 allow_subqueries: true,
1003 allow_parameters: true,
1004 allow_windows: false,
1005 };
1006 let mut map_exprs = vec![];
1007 let mut project_key = vec![];
1008 for (i, target_typ) in target_types.into_iter().enumerate() {
1009 let expr = HirScalarExpr::column(i);
1010 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014 Ok(cast_expr) => {
1015 if expr == cast_expr {
1016 project_key.push(i);
1018 } else {
1019 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021 map_exprs.push(cast_expr);
1022 }
1023 }
1024 Err(_) => {
1025 return Err(CastRelationError {
1026 column: i,
1027 source_type: ecx.scalar_type(&expr),
1028 target_type: target_typ.clone(),
1029 });
1030 }
1031 }
1032 }
1033 Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036pub fn plan_as_of(
1039 scx: &StatementContext,
1040 as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042 match as_of {
1043 None => Ok(QueryWhen::Immediately),
1044 Some(as_of) => match as_of {
1045 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047 },
1048 }
1049}
1050
1051pub fn plan_as_of_or_up_to(
1061 scx: &StatementContext,
1062 mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064 let scope = Scope::empty();
1065 let desc = RelationDesc::empty();
1066 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069 transform_ast::transform(scx, &mut expr)?;
1070 let ecx = &ExprContext {
1071 qcx: &qcx,
1072 name: "AS OF or UP TO",
1073 scope: &scope,
1074 relation_type: desc.typ(),
1075 allow_aggregates: false,
1076 allow_subqueries: false,
1077 allow_parameters: false,
1078 allow_windows: false,
1079 };
1080 let hir = plan_expr(ecx, &expr)?.cast_to(
1081 ecx,
1082 CastContext::Assignment,
1083 &SqlScalarType::MzTimestamp,
1084 )?;
1085 if hir.contains_unmaterializable() {
1086 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087 }
1088 let timestamp = hir
1095 .into_literal_mz_timestamp()
1096 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097 Ok(timestamp)
1098}
1099
1100pub fn plan_secret_as(
1102 scx: &StatementContext,
1103 mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105 let scope = Scope::empty();
1106 let desc = RelationDesc::empty();
1107 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109 transform_ast::transform(scx, &mut expr)?;
1110
1111 let ecx = &ExprContext {
1112 qcx: &qcx,
1113 name: "AS",
1114 scope: &scope,
1115 relation_type: desc.typ(),
1116 allow_aggregates: false,
1117 allow_subqueries: false,
1118 allow_parameters: false,
1119 allow_windows: false,
1120 };
1121 let expr = plan_expr(ecx, &expr)?
1122 .type_as(ecx, &SqlScalarType::Bytes)?
1123 .lower_uncorrelated()?;
1124 Ok(expr)
1125}
1126
1127pub fn plan_webhook_validate_using(
1129 scx: &StatementContext,
1130 validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134 let CreateWebhookSourceCheck {
1135 options,
1136 using: mut expr,
1137 } = validate_using;
1138
1139 let mut column_typs = vec![];
1140 let mut column_names = vec![];
1141
1142 let (bodies, headers, secrets) = options
1143 .map(|o| (o.bodies, o.headers, o.secrets))
1144 .unwrap_or_default();
1145
1146 let mut body_tuples = vec![];
1148 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149 let scalar_type = use_bytes
1150 .then_some(SqlScalarType::Bytes)
1151 .unwrap_or(SqlScalarType::String);
1152 let name = alias
1153 .map(|a| a.into_string())
1154 .unwrap_or_else(|| "body".to_string());
1155
1156 column_typs.push(SqlColumnType {
1157 scalar_type,
1158 nullable: false,
1159 });
1160 column_names.push(name);
1161
1162 let column_idx = column_typs.len() - 1;
1164 assert_eq!(
1166 column_idx,
1167 column_names.len() - 1,
1168 "body column names and types don't match"
1169 );
1170 body_tuples.push((column_idx, use_bytes));
1171 }
1172
1173 let mut header_tuples = vec![];
1175
1176 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177 let value_type = use_bytes
1178 .then_some(SqlScalarType::Bytes)
1179 .unwrap_or(SqlScalarType::String);
1180 let name = alias
1181 .map(|a| a.into_string())
1182 .unwrap_or_else(|| "headers".to_string());
1183
1184 column_typs.push(SqlColumnType {
1185 scalar_type: SqlScalarType::Map {
1186 value_type: Box::new(value_type),
1187 custom_id: None,
1188 },
1189 nullable: false,
1190 });
1191 column_names.push(name);
1192
1193 let column_idx = column_typs.len() - 1;
1195 assert_eq!(
1197 column_idx,
1198 column_names.len() - 1,
1199 "header column names and types don't match"
1200 );
1201 header_tuples.push((column_idx, use_bytes));
1202 }
1203
1204 let mut validation_secrets = vec![];
1206
1207 for CreateWebhookSourceSecret {
1208 secret,
1209 alias,
1210 use_bytes,
1211 } in secrets
1212 {
1213 let scalar_type = use_bytes
1215 .then_some(SqlScalarType::Bytes)
1216 .unwrap_or(SqlScalarType::String);
1217
1218 column_typs.push(SqlColumnType {
1219 scalar_type,
1220 nullable: false,
1221 });
1222 let ResolvedItemName::Item {
1223 id,
1224 full_name: FullItemName { item, .. },
1225 ..
1226 } = secret
1227 else {
1228 return Err(PlanError::InvalidSecret(Box::new(secret)));
1229 };
1230
1231 let name = if let Some(alias) = alias {
1233 alias.into_string()
1234 } else {
1235 item
1236 };
1237 column_names.push(name);
1238
1239 let column_idx = column_typs.len() - 1;
1242 assert_eq!(
1244 column_idx,
1245 column_names.len() - 1,
1246 "column names and types don't match"
1247 );
1248
1249 validation_secrets.push(WebhookValidationSecret {
1250 id,
1251 column_idx,
1252 use_bytes,
1253 });
1254 }
1255
1256 let relation_typ = SqlRelationType::new(column_typs);
1257 let desc = RelationDesc::new(relation_typ, column_names.clone());
1258 let scope = Scope::from_source(None, column_names);
1259
1260 transform_ast::transform(scx, &mut expr)?;
1261
1262 let ecx = &ExprContext {
1263 qcx: &qcx,
1264 name: "CHECK",
1265 scope: &scope,
1266 relation_type: desc.typ(),
1267 allow_aggregates: false,
1268 allow_subqueries: false,
1269 allow_parameters: false,
1270 allow_windows: false,
1271 };
1272 let expr = plan_expr(ecx, &expr)?
1273 .type_as(ecx, &SqlScalarType::Bool)?
1274 .lower_uncorrelated()?;
1275 let validation = WebhookValidation {
1276 expression: expr,
1277 relation_desc: desc,
1278 bodies: body_tuples,
1279 headers: header_tuples,
1280 secrets: validation_secrets,
1281 };
1282 Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286 scx: &StatementContext,
1287 expr: &Expr<Aug>,
1288 target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291 let ecx = &ExprContext {
1292 qcx: &qcx,
1293 name: "DEFAULT expression",
1294 scope: &Scope::empty(),
1295 relation_type: &SqlRelationType::empty(),
1296 allow_aggregates: false,
1297 allow_subqueries: false,
1298 allow_parameters: false,
1299 allow_windows: false,
1300 };
1301 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302 Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306 scx: &'a StatementContext,
1307 params: Vec<Expr<Aug>>,
1308 desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310 if params.len() != desc.param_types.len() {
1311 sql_bail!(
1312 "expected {} params, got {}",
1313 desc.param_types.len(),
1314 params.len()
1315 );
1316 }
1317
1318 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320 let mut datums = Row::default();
1321 let mut packer = datums.packer();
1322 let mut actual_types = Vec::new();
1323 let temp_storage = &RowArena::new();
1324 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325 transform_ast::transform(scx, &mut expr)?;
1326
1327 let ecx = execute_expr_context(&qcx);
1328 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329 let actual_ty = ecx.scalar_type(&ex);
1330 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331 return Err(PlanError::WrongParameterType(
1332 i + 1,
1333 ecx.humanize_scalar_type(expected_ty, false),
1334 ecx.humanize_scalar_type(&actual_ty, false),
1335 ));
1336 }
1337 let ex = ex.lower_uncorrelated()?;
1338 let evaled = ex.eval(&[], temp_storage)?;
1339 packer.push(evaled);
1340 actual_types.push(actual_ty);
1341 }
1342 Ok(Params {
1343 datums,
1344 execute_types: actual_types,
1345 expected_types: desc.param_types.clone(),
1346 })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354 ExprContext {
1355 qcx,
1356 name: "EXECUTE",
1357 scope: &EXECUTE_CONTEXT_SCOPE,
1358 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359 allow_aggregates: false,
1360 allow_subqueries: false,
1361 allow_parameters: false,
1362 allow_windows: false,
1363 }
1364}
1365
1366pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371 LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374 scx: &'a StatementContext,
1375 on_desc: &RelationDesc,
1376 exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378 let scope = Scope::from_source(None, on_desc.iter_names());
1379 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381 let ecx = &ExprContext {
1382 qcx: &qcx,
1383 name: "CREATE INDEX",
1384 scope: &scope,
1385 relation_type: on_desc.typ(),
1386 allow_aggregates: false,
1387 allow_subqueries: false,
1388 allow_parameters: false,
1389 allow_windows: false,
1390 };
1391 let mut out = vec![];
1392 for mut expr in exprs {
1393 transform_ast::transform(scx, &mut expr)?;
1394 let expr = plan_expr_or_col_index(ecx, &expr)?;
1395 let mut expr = expr.lower_uncorrelated()?;
1396 expr.reduce(&on_desc.typ().column_types);
1397 out.push(expr);
1398 }
1399 Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404 Some(column) => Ok(HirScalarExpr::column(column)),
1405 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406 }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410 match e {
1411 Expr::Value(Value::Number(n)) => {
1412 let n = n.parse::<usize>().map_err(|e| {
1413 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414 })?;
1415 if n < 1 || n > max {
1416 sql_bail!(
1417 "column reference {} in {} is out of range (1 - {})",
1418 n,
1419 name,
1420 max
1421 );
1422 }
1423 Ok(Some(n - 1))
1424 }
1425 _ => Ok(None),
1426 }
1427}
1428
1429struct PlannedQuery {
1430 expr: HirRelationExpr,
1431 scope: Scope,
1432 order_by: Vec<ColumnOrder>,
1433 limit: Option<HirScalarExpr>,
1434 offset: HirScalarExpr,
1440 project: Vec<usize>,
1441 group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449 let cte_bindings = plan_ctes(qcx, q)?;
1452
1453 let limit = match &q.limit {
1454 None => None,
1455 Some(Limit {
1456 quantity,
1457 with_ties: false,
1458 }) => {
1459 let ecx = &ExprContext {
1460 qcx,
1461 name: "LIMIT",
1462 scope: &Scope::empty(),
1463 relation_type: &SqlRelationType::empty(),
1464 allow_aggregates: false,
1465 allow_subqueries: true,
1466 allow_parameters: true,
1467 allow_windows: false,
1468 };
1469 let limit = plan_expr(ecx, quantity)?;
1470 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472 let limit = if limit.is_constant() {
1473 let arena = RowArena::new();
1474 let limit = limit.lower_uncorrelated()?;
1475
1476 match limit.eval(&[], &arena)? {
1480 d @ Datum::Int64(v) if v >= 0 => {
1481 HirScalarExpr::literal(d, SqlScalarType::Int64)
1482 }
1483 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486 }
1487 } else {
1488 qcx.scx
1490 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491 limit
1492 };
1493
1494 Some(limit)
1495 }
1496 Some(Limit {
1497 quantity: _,
1498 with_ties: true,
1499 }) => bail_unsupported!("FETCH ... WITH TIES"),
1500 };
1501
1502 let offset = match &q.offset {
1503 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504 Some(offset) => {
1505 let ecx = &ExprContext {
1506 qcx,
1507 name: "OFFSET",
1508 scope: &Scope::empty(),
1509 relation_type: &SqlRelationType::empty(),
1510 allow_aggregates: false,
1511 allow_subqueries: false,
1512 allow_parameters: true,
1513 allow_windows: false,
1514 };
1515 let offset = plan_expr(ecx, offset)?;
1516 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518 let offset = if offset.is_constant() {
1519 let offset_value = offset_into_value(offset)?;
1521 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522 } else {
1523 if !offset.contains_parameters() {
1527 return Err(PlanError::InvalidOffset(format!(
1528 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529 offset
1530 )));
1531 }
1532 offset
1533 };
1534 offset
1535 }
1536 };
1537
1538 let mut planned_query = match &q.body {
1539 SetExpr::Select(s) => {
1540 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545 PlannedQuery {
1546 expr: plan.expr,
1547 scope: plan.scope,
1548 order_by: plan.order_by,
1549 project: plan.project,
1550 limit,
1551 offset,
1552 group_size_hints,
1553 }
1554 }
1555 _ => {
1556 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557 let ecx = &ExprContext {
1558 qcx,
1559 name: "ORDER BY clause of a set expression",
1560 scope: &scope,
1561 relation_type: &qcx.relation_type(&expr),
1562 allow_aggregates: false,
1563 allow_subqueries: true,
1564 allow_parameters: true,
1565 allow_windows: false,
1566 };
1567 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569 let project = (0..ecx.relation_type.arity()).collect();
1570 PlannedQuery {
1571 expr: expr.map(map_exprs),
1572 scope,
1573 order_by,
1574 limit,
1575 project,
1576 offset,
1577 group_size_hints: GroupSizeHints::default(),
1578 }
1579 }
1580 };
1581
1582 match &q.ctes {
1584 CteBlock::Simple(_) => {
1585 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586 if let Some(cte) = qcx.ctes.remove(&id) {
1587 planned_query.expr = HirRelationExpr::Let {
1588 name: cte.name,
1589 id: id.clone(),
1590 value: Box::new(value),
1591 body: Box::new(planned_query.expr),
1592 };
1593 }
1594 if let Some(shadowed_val) = shadowed_val {
1595 qcx.ctes.insert(id, shadowed_val);
1596 }
1597 }
1598 }
1599 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600 let MutRecBlockOptionExtracted {
1601 recursion_limit,
1602 return_at_recursion_limit,
1603 error_at_recursion_limit,
1604 seen: _,
1605 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606 let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607 (None, None, None) => None,
1608 (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609 (None, Some(max_iters), None) => Some((max_iters, true)),
1610 (None, None, Some(max_iters)) => Some((max_iters, false)),
1611 _ => {
1612 return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613 }
1614 }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616 return_at_limit: *return_at_limit,
1617 }))?;
1618
1619 let mut bindings = Vec::new();
1620 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621 if let Some(cte) = qcx.ctes.remove(&id) {
1622 bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1623 }
1624 if let Some(shadowed_val) = shadowed_val {
1625 qcx.ctes.insert(id, shadowed_val);
1626 }
1627 }
1628 if !bindings.is_empty() {
1629 planned_query.expr = HirRelationExpr::LetRec {
1630 limit,
1631 bindings,
1632 body: Box::new(planned_query.expr),
1633 }
1634 }
1635 }
1636 }
1637
1638 Ok(planned_query)
1639}
1640
1641pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643 let offset = offset
1644 .try_into_literal_int64()
1645 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646 if offset < 0 {
1647 return Err(PlanError::InvalidOffset(format!(
1648 "must not be negative, got {}",
1649 offset
1650 )));
1651 }
1652 Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656 MutRecBlockOption,
1657 (RecursionLimit, u64),
1658 (ReturnAtRecursionLimit, u64),
1659 (ErrorAtRecursionLimit, u64)
1660);
1661
1662pub fn plan_ctes(
1667 qcx: &mut QueryContext,
1668 q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670 let mut result = Vec::new();
1672 let mut shadowed_descs = BTreeMap::new();
1675
1676 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678 sql_bail!(
1679 "WITH query name {} specified more than once",
1680 normalize::ident_ref(ident).quoted()
1681 )
1682 }
1683
1684 match &q.ctes {
1685 CteBlock::Simple(ctes) => {
1686 for cte in ctes.iter() {
1688 let cte_name = normalize::ident(cte.alias.name.clone());
1689 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690 let typ = qcx.relation_type(&val);
1691 let mut desc = RelationDesc::new(typ, scope.column_names());
1692 plan_utils::maybe_rename_columns(
1693 format!("CTE {}", cte.alias.name),
1694 &mut desc,
1695 &cte.alias.columns,
1696 )?;
1697 let shadowed = qcx.ctes.insert(
1699 cte.id,
1700 CteDesc {
1701 name: cte_name,
1702 desc,
1703 },
1704 );
1705
1706 result.push((cte.id, val, shadowed));
1707 }
1708 }
1709 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710 for cte in ctes.iter() {
1712 let cte_name = normalize::ident(cte.name.clone());
1713 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714 for column in cte.columns.iter() {
1715 desc_columns.push((
1716 normalize::column_name(column.name.clone()),
1717 SqlColumnType {
1718 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719 nullable: true,
1720 },
1721 ));
1722 }
1723 let desc = RelationDesc::from_names_and_types(desc_columns);
1724 let shadowed = qcx.ctes.insert(
1725 cte.id,
1726 CteDesc {
1727 name: cte_name,
1728 desc,
1729 },
1730 );
1731 if let Some(shadowed) = shadowed {
1733 shadowed_descs.insert(cte.id, shadowed);
1734 }
1735 }
1736
1737 for cte in ctes.iter() {
1739 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744 sql_bail!(
1747 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748 );
1749 }
1750
1751 if !proposed_typ.keys.is_empty() {
1752 sql_bail!("[internal error]: WMR CTEs do not support keys");
1755 }
1756
1757 let derived_typ = qcx.relation_type(&val);
1759
1760 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761 let cte_name = normalize::ident(cte.name.clone());
1762 let proposed_typ = proposed_typ
1763 .column_types
1764 .iter()
1765 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766 .collect::<Vec<_>>();
1767 let inferred_typ = derived_typ
1768 .column_types
1769 .iter()
1770 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771 .collect::<Vec<_>>();
1772 Err(PlanError::RecursiveTypeMismatch(
1773 cte_name,
1774 proposed_typ,
1775 inferred_typ,
1776 ))
1777 };
1778
1779 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780 return type_err(proposed_typ, derived_typ);
1781 }
1782
1783 let val = match cast_relation(
1785 qcx,
1786 CastContext::Assignment,
1791 val,
1792 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793 ) {
1794 Ok(val) => val,
1795 Err(_) => return type_err(proposed_typ, derived_typ),
1796 };
1797
1798 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799 }
1800 }
1801 }
1802
1803 Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807 qcx: &mut QueryContext,
1808 q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810 let PlannedQuery {
1811 mut expr,
1812 scope,
1813 order_by,
1814 limit,
1815 offset,
1816 project,
1817 group_size_hints,
1818 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819 if limit.is_some()
1820 || !offset
1821 .clone()
1822 .try_into_literal_int64()
1823 .is_ok_and(|offset| offset == 0)
1824 {
1825 expr = HirRelationExpr::top_k(
1826 expr,
1827 vec![],
1828 order_by,
1829 limit,
1830 offset,
1831 group_size_hints.limit_input_group_size,
1832 );
1833 }
1834 Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838 qcx: &mut QueryContext,
1839 q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841 match q {
1842 SetExpr::Select(select) => {
1843 let order_by_exprs = Vec::new();
1844 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845 assert!(plan.order_by.is_empty());
1848 Ok((plan.expr.project(plan.project), plan.scope))
1849 }
1850 SetExpr::SetOperation {
1851 op,
1852 all,
1853 left,
1854 right,
1855 } => {
1856 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858 let (right_expr, right_scope) =
1859 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861 let left_type = qcx.relation_type(&left_expr);
1863 let right_type = qcx.relation_type(&right_expr);
1864 if left_type.arity() != right_type.arity() {
1865 sql_bail!(
1866 "each {} query must have the same number of columns: {} vs {}",
1867 op,
1868 left_type.arity(),
1869 right_type.arity(),
1870 );
1871 }
1872
1873 let left_ecx = &ExprContext {
1878 qcx,
1879 name: &op.to_string(),
1880 scope: &left_scope,
1881 relation_type: &left_type,
1882 allow_aggregates: false,
1883 allow_subqueries: false,
1884 allow_parameters: false,
1885 allow_windows: false,
1886 };
1887 let right_ecx = &ExprContext {
1888 qcx,
1889 name: &op.to_string(),
1890 scope: &right_scope,
1891 relation_type: &right_type,
1892 allow_aggregates: false,
1893 allow_subqueries: false,
1894 allow_parameters: false,
1895 allow_windows: false,
1896 };
1897 let mut left_casts = vec![];
1898 let mut right_casts = vec![];
1899 for (i, (left_type, right_type)) in left_type
1900 .column_types
1901 .iter()
1902 .zip_eq(right_type.column_types.iter())
1903 .enumerate()
1904 {
1905 let types = &[
1906 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908 ];
1909 let target =
1910 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911 match typeconv::plan_cast(
1912 left_ecx,
1913 CastContext::Implicit,
1914 HirScalarExpr::column(i),
1915 &target,
1916 ) {
1917 Ok(expr) => left_casts.push(expr),
1918 Err(_) => sql_bail!(
1919 "{} types {} and {} cannot be matched",
1920 op,
1921 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922 qcx.humanize_scalar_type(&target, false),
1923 ),
1924 }
1925 match typeconv::plan_cast(
1926 right_ecx,
1927 CastContext::Implicit,
1928 HirScalarExpr::column(i),
1929 &target,
1930 ) {
1931 Ok(expr) => right_casts.push(expr),
1932 Err(_) => sql_bail!(
1933 "{} types {} and {} cannot be matched",
1934 op,
1935 qcx.humanize_scalar_type(&target, false),
1936 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937 ),
1938 }
1939 }
1940 let lhs = if left_casts
1941 .iter()
1942 .enumerate()
1943 .any(|(i, e)| e != &HirScalarExpr::column(i))
1944 {
1945 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946 left_expr.map(left_casts).project(project_key)
1947 } else {
1948 left_expr
1949 };
1950 let rhs = if right_casts
1951 .iter()
1952 .enumerate()
1953 .any(|(i, e)| e != &HirScalarExpr::column(i))
1954 {
1955 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956 right_expr.map(right_casts).project(project_key)
1957 } else {
1958 right_expr
1959 };
1960
1961 let relation_expr = match op {
1962 SetOperator::Union => {
1963 if *all {
1964 lhs.union(rhs)
1965 } else {
1966 lhs.union(rhs).distinct()
1967 }
1968 }
1969 SetOperator::Except => Hir::except(all, lhs, rhs),
1970 SetOperator::Intersect => {
1971 let left_clone = lhs.clone();
1977 if *all {
1978 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979 } else {
1980 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981 .distinct()
1982 }
1983 }
1984 };
1985 let scope = Scope::from_source(
1986 None,
1987 left_scope.column_names(),
1989 );
1990
1991 Ok((relation_expr, scope))
1992 }
1993 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994 SetExpr::Table(name) => {
1995 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996 Ok((expr, scope))
1997 }
1998 SetExpr::Query(query) => {
1999 let (expr, scope) = plan_nested_query(qcx, query)?;
2000 Ok((expr, scope))
2001 }
2002 SetExpr::Show(stmt) => {
2003 if !qcx.lifetime.allow_show() {
2017 return Err(PlanError::ShowCommandInView);
2018 }
2019
2020 fn to_hirscope(
2023 plan: ShowCreatePlan,
2024 desc: StatementDesc,
2025 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027 let desc = desc.relation_desc.expect("must exist");
2028 let expr = HirRelationExpr::constant(rows, desc.typ().clone());
2029 let scope = Scope::from_source(None, desc.iter_names());
2030 Ok((expr, scope))
2031 }
2032
2033 match stmt.clone() {
2034 ShowStatement::ShowColumns(stmt) => {
2035 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036 }
2037 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039 show::describe_show_create_connection(qcx.scx, stmt)?,
2040 ),
2041 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043 show::describe_show_create_cluster(qcx.scx, stmt)?,
2044 ),
2045 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_index(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_sink(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_source(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_table(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_view(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068 ),
2069 ShowStatement::ShowObjects(stmt) => {
2070 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2071 }
2072 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2073 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2074 }
2075 }
2076 }
2077}
2078
2079fn plan_values(
2081 qcx: &QueryContext,
2082 values: &[Vec<Expr<Aug>>],
2083) -> Result<(HirRelationExpr, Scope), PlanError> {
2084 assert!(!values.is_empty());
2085
2086 let ecx = &ExprContext {
2087 qcx,
2088 name: "VALUES",
2089 scope: &Scope::empty(),
2090 relation_type: &SqlRelationType::empty(),
2091 allow_aggregates: false,
2092 allow_subqueries: true,
2093 allow_parameters: true,
2094 allow_windows: false,
2095 };
2096
2097 let ncols = values[0].len();
2098 let nrows = values.len();
2099
2100 let mut cols = vec![vec![]; ncols];
2103 for row in values {
2104 if row.len() != ncols {
2105 sql_bail!(
2106 "VALUES expression has varying number of columns: {} vs {}",
2107 row.len(),
2108 ncols
2109 );
2110 }
2111 for (i, v) in row.iter().enumerate() {
2112 cols[i].push(v);
2113 }
2114 }
2115
2116 let mut col_iters = Vec::with_capacity(ncols);
2118 let mut col_types = Vec::with_capacity(ncols);
2119 for col in &cols {
2120 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2121 let mut col_type = ecx.column_type(&col[0]);
2122 for val in &col[1..] {
2123 col_type = col_type.union(&ecx.column_type(val))?;
2124 }
2125 col_types.push(col_type);
2126 col_iters.push(col.into_iter());
2127 }
2128
2129 let mut exprs = vec![];
2131 for _ in 0..nrows {
2132 for i in 0..ncols {
2133 exprs.push(col_iters[i].next().unwrap());
2134 }
2135 }
2136 let out = HirRelationExpr::CallTable {
2137 func: TableFunc::Wrap {
2138 width: ncols,
2139 types: col_types,
2140 },
2141 exprs,
2142 };
2143
2144 let mut scope = Scope::empty();
2146 for i in 0..ncols {
2147 let name = format!("column{}", i + 1);
2148 scope.items.push(ScopeItem::from_column_name(name));
2149 }
2150
2151 Ok((out, scope))
2152}
2153
2154fn plan_values_insert(
2164 qcx: &QueryContext,
2165 target_names: &[&ColumnName],
2166 target_types: &[&SqlScalarType],
2167 values: &[Vec<Expr<Aug>>],
2168) -> Result<HirRelationExpr, PlanError> {
2169 assert!(!values.is_empty());
2170
2171 if !values.iter().map(|row| row.len()).all_equal() {
2172 sql_bail!("VALUES lists must all be the same length");
2173 }
2174
2175 let ecx = &ExprContext {
2176 qcx,
2177 name: "VALUES",
2178 scope: &Scope::empty(),
2179 relation_type: &SqlRelationType::empty(),
2180 allow_aggregates: false,
2181 allow_subqueries: true,
2182 allow_parameters: true,
2183 allow_windows: false,
2184 };
2185
2186 let mut exprs = vec![];
2187 let mut types = vec![];
2188 for row in values {
2189 if row.len() > target_names.len() {
2190 sql_bail!("INSERT has more expressions than target columns");
2191 }
2192 for (column, val) in row.into_iter().enumerate() {
2193 let target_type = &target_types[column];
2194 let val = plan_expr(ecx, val)?;
2195 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2196 let source_type = &ecx.scalar_type(&val);
2197 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2198 Ok(val) => val,
2199 Err(_) => sql_bail!(
2200 "column {} is of type {} but expression is of type {}",
2201 target_names[column].quoted(),
2202 qcx.humanize_scalar_type(target_type, false),
2203 qcx.humanize_scalar_type(source_type, false),
2204 ),
2205 };
2206 if column >= types.len() {
2207 types.push(ecx.column_type(&val));
2208 } else {
2209 types[column] = types[column].union(&ecx.column_type(&val))?;
2210 }
2211 exprs.push(val);
2212 }
2213 }
2214
2215 Ok(HirRelationExpr::CallTable {
2216 func: TableFunc::Wrap {
2217 width: values[0].len(),
2218 types,
2219 },
2220 exprs,
2221 })
2222}
2223
2224fn plan_join_identity() -> (HirRelationExpr, Scope) {
2225 let typ = SqlRelationType::new(vec![]);
2226 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2227 let scope = Scope::empty();
2228 (expr, scope)
2229}
2230
2231#[derive(Debug)]
2237struct SelectPlan {
2238 expr: HirRelationExpr,
2239 scope: Scope,
2240 order_by: Vec<ColumnOrder>,
2241 project: Vec<usize>,
2242}
2243
2244generate_extracted_config!(
2245 SelectOption,
2246 (ExpectedGroupSize, u64),
2247 (AggregateInputGroupSize, u64),
2248 (DistinctOnInputGroupSize, u64),
2249 (LimitInputGroupSize, u64)
2250);
2251
2252fn plan_select_from_where(
2270 qcx: &QueryContext,
2271 mut s: Select<Aug>,
2272 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2273) -> Result<SelectPlan, PlanError> {
2274 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2281 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2282
2283 let (mut relation_expr, mut from_scope) =
2285 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2286 let (left, left_scope) = l;
2287 plan_join(
2288 qcx,
2289 left,
2290 left_scope,
2291 &Join {
2292 relation: TableFactor::NestedJoin {
2293 join: Box::new(twj.clone()),
2294 alias: None,
2295 },
2296 join_operator: JoinOperator::CrossJoin,
2297 },
2298 )
2299 })?;
2300
2301 if let Some(selection) = &s.selection {
2303 let ecx = &ExprContext {
2304 qcx,
2305 name: "WHERE clause",
2306 scope: &from_scope,
2307 relation_type: &qcx.relation_type(&relation_expr),
2308 allow_aggregates: false,
2309 allow_subqueries: true,
2310 allow_parameters: true,
2311 allow_windows: false,
2312 };
2313 let expr = plan_expr(ecx, selection)
2314 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2315 .type_as(ecx, &SqlScalarType::Bool)?;
2316 relation_expr = relation_expr.filter(vec![expr]);
2317 }
2318
2319 let (aggregates, table_funcs) = {
2322 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2323 visitor.visit_select_mut(&mut s);
2324 for o in order_by_exprs.iter_mut() {
2325 visitor.visit_order_by_expr_mut(o);
2326 }
2327 visitor.into_result()?
2328 };
2329 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2330 if !table_funcs.is_empty() {
2331 let (expr, scope) = plan_scalar_table_funcs(
2332 qcx,
2333 table_funcs,
2334 &mut table_func_names,
2335 &relation_expr,
2336 &from_scope,
2337 )?;
2338 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2339 from_scope = from_scope.product(scope)?;
2340 }
2341
2342 let projection = {
2344 let ecx = &ExprContext {
2345 qcx,
2346 name: "SELECT clause",
2347 scope: &from_scope,
2348 relation_type: &qcx.relation_type(&relation_expr),
2349 allow_aggregates: true,
2350 allow_subqueries: true,
2351 allow_parameters: true,
2352 allow_windows: true,
2353 };
2354 let mut out = vec![];
2355 for si in &s.projection {
2356 if *si == SelectItem::Wildcard && s.from.is_empty() {
2357 sql_bail!("SELECT * with no tables specified is not valid");
2358 }
2359 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2360 }
2361 out
2362 };
2363
2364 let (mut group_scope, select_all_mapping) = {
2368 let ecx = &ExprContext {
2370 qcx,
2371 name: "GROUP BY clause",
2372 scope: &from_scope,
2373 relation_type: &qcx.relation_type(&relation_expr),
2374 allow_aggregates: false,
2375 allow_subqueries: true,
2376 allow_parameters: true,
2377 allow_windows: false,
2378 };
2379 let mut group_key = vec![];
2380 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2381 let mut group_hir_exprs = vec![];
2382 let mut group_scope = Scope::empty();
2383 let mut select_all_mapping = BTreeMap::new();
2384
2385 for group_expr in &s.group_by {
2386 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2387 let new_column = group_key.len();
2388
2389 if let Some(group_expr) = group_expr {
2390 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2394 existing_scope_item.exprs.insert(group_expr.clone());
2395 continue;
2396 }
2397 }
2398
2399 let mut scope_item = if let HirScalarExpr::Column(
2400 ColumnRef {
2401 level: 0,
2402 column: old_column,
2403 },
2404 _name,
2405 ) = &expr
2406 {
2407 select_all_mapping.insert(*old_column, new_column);
2413 let scope_item = ecx.scope.items[*old_column].clone();
2414 scope_item
2415 } else {
2416 ScopeItem::empty()
2417 };
2418
2419 if let Some(group_expr) = group_expr.cloned() {
2420 scope_item.exprs.insert(group_expr);
2421 }
2422
2423 group_key.push(from_scope.len() + group_exprs.len());
2424 group_hir_exprs.push(expr.clone());
2425 group_exprs.insert(expr, scope_item);
2426 }
2427
2428 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2429 for expr in &group_hir_exprs {
2430 if let Some(scope_item) = group_exprs.remove(expr) {
2431 group_scope.items.push(scope_item);
2432 }
2433 }
2434
2435 let ecx = &ExprContext {
2437 qcx,
2438 name: "aggregate function",
2439 scope: &from_scope,
2440 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2441 allow_aggregates: false,
2442 allow_subqueries: true,
2443 allow_parameters: true,
2444 allow_windows: false,
2445 };
2446 let mut agg_exprs = vec![];
2447 for sql_function in aggregates {
2448 if sql_function.over.is_some() {
2449 unreachable!(
2450 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2451 );
2452 }
2453 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2454 group_scope
2455 .items
2456 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2457 }
2458 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2459 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2461 group_key,
2462 agg_exprs,
2463 group_size_hints.aggregate_input_group_size,
2464 );
2465
2466 for i in 0..from_scope.len() {
2472 if !select_all_mapping.contains_key(&i) {
2473 let scope_item = &ecx.scope.items[i];
2474 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2475 table_name: scope_item.table_name.clone(),
2476 column_name: scope_item.column_name.clone(),
2477 allow_unqualified_references: scope_item.allow_unqualified_references,
2478 });
2479 }
2480 }
2481
2482 (group_scope, select_all_mapping)
2483 } else {
2484 (
2486 from_scope.clone(),
2487 (0..from_scope.len()).map(|i| (i, i)).collect(),
2488 )
2489 }
2490 };
2491
2492 if let Some(ref having) = s.having {
2494 let ecx = &ExprContext {
2495 qcx,
2496 name: "HAVING clause",
2497 scope: &group_scope,
2498 relation_type: &qcx.relation_type(&relation_expr),
2499 allow_aggregates: true,
2500 allow_subqueries: true,
2501 allow_parameters: true,
2502 allow_windows: false,
2503 };
2504 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2505 relation_expr = relation_expr.filter(vec![expr]);
2506 }
2507
2508 let window_funcs = {
2521 let mut visitor = WindowFuncCollector::default();
2522 visitor.visit_select(&s);
2526 for o in order_by_exprs.iter() {
2527 visitor.visit_order_by_expr(o);
2528 }
2529 visitor.into_result()
2530 };
2531 for window_func in window_funcs {
2532 let ecx = &ExprContext {
2533 qcx,
2534 name: "window function",
2535 scope: &group_scope,
2536 relation_type: &qcx.relation_type(&relation_expr),
2537 allow_aggregates: true,
2538 allow_subqueries: true,
2539 allow_parameters: true,
2540 allow_windows: true,
2541 };
2542 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2543 group_scope.items.push(ScopeItem::from_expr(window_func));
2544 }
2545 if let Some(ref qualify) = s.qualify {
2552 let ecx = &ExprContext {
2553 qcx,
2554 name: "QUALIFY clause",
2555 scope: &group_scope,
2556 relation_type: &qcx.relation_type(&relation_expr),
2557 allow_aggregates: true,
2558 allow_subqueries: true,
2559 allow_parameters: true,
2560 allow_windows: true,
2561 };
2562 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2563 relation_expr = relation_expr.filter(vec![expr]);
2564 }
2565
2566 let output_columns = {
2568 let mut new_exprs = vec![];
2569 let mut new_type = qcx.relation_type(&relation_expr);
2570 let mut output_columns = vec![];
2571 for (select_item, column_name) in &projection {
2572 let ecx = &ExprContext {
2573 qcx,
2574 name: "SELECT clause",
2575 scope: &group_scope,
2576 relation_type: &new_type,
2577 allow_aggregates: true,
2578 allow_subqueries: true,
2579 allow_parameters: true,
2580 allow_windows: true,
2581 };
2582 let expr = match select_item {
2583 ExpandedSelectItem::InputOrdinal(i) => {
2584 if let Some(column) = select_all_mapping.get(i).copied() {
2585 HirScalarExpr::column(column)
2586 } else {
2587 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2588 }
2589 }
2590 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2591 };
2592 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2593 output_columns.push((column, column_name));
2595 } else {
2596 let typ = ecx.column_type(&expr);
2603 new_type.column_types.push(typ);
2604 new_exprs.push(expr);
2605 output_columns.push((group_scope.len(), column_name));
2606 group_scope
2607 .items
2608 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2609 }
2610 }
2611 relation_expr = relation_expr.map(new_exprs);
2612 output_columns
2613 };
2614 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2615
2616 let order_by = {
2618 let relation_type = qcx.relation_type(&relation_expr);
2619 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2620 &ExprContext {
2621 qcx,
2622 name: "ORDER BY clause",
2623 scope: &group_scope,
2624 relation_type: &relation_type,
2625 allow_aggregates: true,
2626 allow_subqueries: true,
2627 allow_parameters: true,
2628 allow_windows: true,
2629 },
2630 &order_by_exprs,
2631 &output_columns,
2632 )?;
2633
2634 match s.distinct {
2635 None => relation_expr = relation_expr.map(map_exprs),
2636 Some(Distinct::EntireRow) => {
2637 if relation_type.arity() == 0 {
2638 sql_bail!("SELECT DISTINCT must have at least one column");
2639 }
2640 if !try_push_projection_order_by(
2644 &mut relation_expr,
2645 &mut project_key,
2646 &mut order_by,
2647 ) {
2648 sql_bail!(
2649 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2650 );
2651 }
2652 assert!(map_exprs.is_empty());
2653 relation_expr = relation_expr.distinct();
2654 }
2655 Some(Distinct::On(exprs)) => {
2656 let ecx = &ExprContext {
2657 qcx,
2658 name: "DISTINCT ON clause",
2659 scope: &group_scope,
2660 relation_type: &qcx.relation_type(&relation_expr),
2661 allow_aggregates: true,
2662 allow_subqueries: true,
2663 allow_parameters: true,
2664 allow_windows: true,
2665 };
2666
2667 let mut distinct_exprs = vec![];
2668 for expr in &exprs {
2669 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2670 distinct_exprs.push(expr);
2671 }
2672
2673 let mut distinct_key = vec![];
2674
2675 let arity = relation_type.arity();
2685 for ord in order_by.iter().take(distinct_exprs.len()) {
2686 let mut expr = &HirScalarExpr::column(ord.column);
2689 if ord.column >= arity {
2690 expr = &map_exprs[ord.column - arity];
2691 };
2692 match distinct_exprs.iter().position(move |e| e == expr) {
2693 None => sql_bail!(
2694 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2695 ),
2696 Some(pos) => {
2697 distinct_exprs.remove(pos);
2698 }
2699 }
2700 distinct_key.push(ord.column);
2701 }
2702
2703 for expr in distinct_exprs {
2705 let column = match expr {
2708 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2709 _ => {
2710 map_exprs.push(expr);
2711 arity + map_exprs.len() - 1
2712 }
2713 };
2714 distinct_key.push(column);
2715 }
2716
2717 let distinct_len = distinct_key.len();
2722 relation_expr = HirRelationExpr::top_k(
2723 relation_expr.map(map_exprs),
2724 distinct_key,
2725 order_by.iter().skip(distinct_len).cloned().collect(),
2726 Some(HirScalarExpr::literal(
2727 Datum::Int64(1),
2728 SqlScalarType::Int64,
2729 )),
2730 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2731 group_size_hints.distinct_on_input_group_size,
2732 );
2733 }
2734 }
2735
2736 order_by
2737 };
2738
2739 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2744
2745 Ok(SelectPlan {
2746 expr: relation_expr,
2747 scope,
2748 order_by,
2749 project: project_key,
2750 })
2751}
2752
2753fn plan_scalar_table_funcs(
2754 qcx: &QueryContext,
2755 table_funcs: BTreeMap<Function<Aug>, String>,
2756 table_func_names: &mut BTreeMap<String, Ident>,
2757 relation_expr: &HirRelationExpr,
2758 from_scope: &Scope,
2759) -> Result<(HirRelationExpr, Scope), PlanError> {
2760 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2761
2762 for (table_func, id) in table_funcs.iter() {
2763 table_func_names.insert(
2764 id.clone(),
2765 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2767 );
2768 }
2769 if table_funcs.len() == 1 {
2772 let (table_func, id) = table_funcs.iter().next().unwrap();
2773 let (expr, mut scope) =
2774 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2775
2776 let num_cols = scope.len();
2778 for i in 0..scope.len() {
2779 scope.items[i].table_name = Some(PartialItemName {
2780 database: None,
2781 schema: None,
2782 item: id.clone(),
2783 });
2784 scope.items[i].from_single_column_function = num_cols == 1;
2785 scope.items[i].allow_unqualified_references = false;
2786 }
2787 return Ok((expr, scope));
2788 }
2789 let (expr, mut scope, num_cols) =
2791 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2792
2793 let mut i = 0;
2795 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2796 for _ in 0..num_cols {
2797 scope.items[i].table_name = Some(PartialItemName {
2798 database: None,
2799 schema: None,
2800 item: id.clone(),
2801 });
2802 scope.items[i].from_single_column_function = num_cols == 1;
2803 scope.items[i].allow_unqualified_references = false;
2804 i += 1;
2805 }
2806 scope.items[i].table_name = Some(PartialItemName {
2810 database: None,
2811 schema: None,
2812 item: id.clone(),
2813 });
2814 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2815 scope.items[i].allow_unqualified_references = false;
2816 i += 1;
2817 }
2818 scope.items[i].allow_unqualified_references = false;
2820 Ok((expr, scope))
2821}
2822
2823fn plan_group_by_expr<'a>(
2830 ecx: &ExprContext,
2831 group_expr: &'a Expr<Aug>,
2832 projection: &'a [(ExpandedSelectItem, ColumnName)],
2833) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2834 let plan_projection = |column: usize| match &projection[column].0 {
2835 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2836 ExpandedSelectItem::Expr(expr) => {
2837 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2838 }
2839 };
2840
2841 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2844 return plan_projection(column);
2845 }
2846
2847 match group_expr {
2851 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2852 Err(PlanError::UnknownColumn {
2853 table: None,
2854 column,
2855 similar,
2856 }) => {
2857 let mut iter = projection.iter().map(|(_expr, name)| name);
2860 if let Some(i) = iter.position(|n| *n == column) {
2861 if iter.any(|n| *n == column) {
2862 Err(PlanError::AmbiguousColumn(column))
2863 } else {
2864 plan_projection(i)
2865 }
2866 } else {
2867 Err(PlanError::UnknownColumn {
2870 table: None,
2871 column,
2872 similar,
2873 })
2874 }
2875 }
2876 res => Ok((Some(group_expr), res?)),
2877 },
2878 _ => Ok((
2879 Some(group_expr),
2880 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2881 )),
2882 }
2883}
2884
2885pub(crate) fn plan_order_by_exprs(
2893 ecx: &ExprContext,
2894 order_by_exprs: &[OrderByExpr<Aug>],
2895 output_columns: &[(usize, &ColumnName)],
2896) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2897 let mut order_by = vec![];
2898 let mut map_exprs = vec![];
2899 for obe in order_by_exprs {
2900 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2901 let column = match expr {
2904 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2905 _ => {
2906 map_exprs.push(expr);
2907 ecx.relation_type.arity() + map_exprs.len() - 1
2908 }
2909 };
2910 order_by.push(resolve_desc_and_nulls_last(obe, column));
2911 }
2912 Ok((order_by, map_exprs))
2913}
2914
2915fn plan_order_by_or_distinct_expr(
2933 ecx: &ExprContext,
2934 expr: &Expr<Aug>,
2935 output_columns: &[(usize, &ColumnName)],
2936) -> Result<HirScalarExpr, PlanError> {
2937 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2938 return Ok(HirScalarExpr::column(output_columns[i].0));
2939 }
2940
2941 if let Expr::Identifier(names) = expr {
2942 if let [name] = &names[..] {
2943 let name = normalize::column_name(name.clone());
2944 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2945 if let Some((i, _)) = iter.next() {
2946 match iter.next() {
2947 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2951 _ => return Ok(HirScalarExpr::column(*i)),
2952 }
2953 }
2954 }
2955 }
2956
2957 plan_expr(ecx, expr)?.type_as_any(ecx)
2958}
2959
2960fn plan_table_with_joins(
2961 qcx: &QueryContext,
2962 table_with_joins: &TableWithJoins<Aug>,
2963) -> Result<(HirRelationExpr, Scope), PlanError> {
2964 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2965 for join in &table_with_joins.joins {
2966 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2967 expr = new_expr;
2968 scope = new_scope;
2969 }
2970 Ok((expr, scope))
2971}
2972
2973fn plan_table_factor(
2974 qcx: &QueryContext,
2975 table_factor: &TableFactor<Aug>,
2976) -> Result<(HirRelationExpr, Scope), PlanError> {
2977 match table_factor {
2978 TableFactor::Table { name, alias } => {
2979 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2980 let scope = plan_table_alias(scope, alias.as_ref())?;
2981 Ok((expr, scope))
2982 }
2983
2984 TableFactor::Function {
2985 function,
2986 alias,
2987 with_ordinality,
2988 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2989
2990 TableFactor::RowsFrom {
2991 functions,
2992 alias,
2993 with_ordinality,
2994 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2995
2996 TableFactor::Derived {
2997 lateral,
2998 subquery,
2999 alias,
3000 } => {
3001 let mut qcx = (*qcx).clone();
3002 if !lateral {
3003 for scope in &mut qcx.outer_scopes {
3007 if scope.lateral_barrier {
3008 break;
3009 }
3010 scope.items.clear();
3011 }
3012 }
3013 qcx.outer_scopes[0].lateral_barrier = true;
3014 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3015 let scope = plan_table_alias(scope, alias.as_ref())?;
3016 Ok((expr, scope))
3017 }
3018
3019 TableFactor::NestedJoin { join, alias } => {
3020 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3021 let scope = plan_table_alias(scope, alias.as_ref())?;
3022 Ok((expr, scope))
3023 }
3024 }
3025}
3026
3027fn plan_rows_from(
3069 qcx: &QueryContext,
3070 functions: &[Function<Aug>],
3071 alias: Option<&TableAlias>,
3072 with_ordinality: bool,
3073) -> Result<(HirRelationExpr, Scope), PlanError> {
3074 if let [function] = functions {
3077 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3078 }
3079
3080 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3084 qcx,
3085 functions,
3086 Some(functions[0].name.full_item_name().clone()),
3087 )?;
3088
3089 let mut columns = Vec::new();
3091 let mut offset = 0;
3092 for (idx, cols) in num_cols.into_iter().enumerate() {
3094 for i in 0..cols {
3095 columns.push(offset + i);
3096 }
3097 offset += cols + 1;
3098
3099 scope.items.remove(offset - idx - 1);
3102 }
3103
3104 if with_ordinality {
3107 columns.push(offset);
3108 } else {
3109 scope.items.pop();
3110 }
3111
3112 let expr = expr.project(columns);
3113
3114 let scope = plan_table_alias(scope, alias)?;
3115 Ok((expr, scope))
3116}
3117
3118fn plan_rows_from_internal<'a>(
3141 qcx: &QueryContext,
3142 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3143 table_name: Option<FullItemName>,
3144) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3145 let mut functions = functions.into_iter();
3146 let mut num_cols = Vec::new();
3147
3148 let (mut left_expr, mut left_scope) =
3152 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3153 num_cols.push(left_scope.len() - 1);
3154 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3156 left_scope
3157 .items
3158 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3159
3160 for function in functions {
3161 let qcx = qcx.empty_derived_context();
3163 let (right_expr, mut right_scope) =
3164 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3165 num_cols.push(right_scope.len() - 1);
3166 let left_col = left_scope.len() - 1;
3167 let right_col = left_scope.len() + right_scope.len() - 1;
3168 let on = HirScalarExpr::call_binary(
3169 HirScalarExpr::column(left_col),
3170 HirScalarExpr::column(right_col),
3171 expr_func::Eq,
3172 );
3173 left_expr = left_expr
3174 .join(right_expr, on, JoinKind::FullOuter)
3175 .map(vec![HirScalarExpr::call_variadic(
3176 VariadicFunc::Coalesce,
3177 vec![
3178 HirScalarExpr::column(left_col),
3179 HirScalarExpr::column(right_col),
3180 ],
3181 )]);
3182
3183 left_expr = left_expr.project(
3186 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3189 );
3190 right_scope.items.push(left_scope.items.pop().unwrap());
3192
3193 left_scope.items.extend(right_scope.items);
3194 }
3195
3196 Ok((left_expr, left_scope, num_cols))
3197}
3198
3199fn plan_solitary_table_function(
3203 qcx: &QueryContext,
3204 function: &Function<Aug>,
3205 alias: Option<&TableAlias>,
3206 with_ordinality: bool,
3207) -> Result<(HirRelationExpr, Scope), PlanError> {
3208 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3209
3210 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3211 if single_column_function {
3212 let item = &mut scope.items[0];
3213
3214 item.from_single_column_function = true;
3217
3218 if let Some(alias) = alias {
3233 if let ScopeItem {
3234 table_name: Some(table_name),
3235 column_name,
3236 ..
3237 } = item
3238 {
3239 if table_name.item.as_str() == column_name.as_str() {
3240 *column_name = normalize::column_name(alias.name.clone());
3241 }
3242 }
3243 }
3244 }
3245
3246 let scope = plan_table_alias(scope, alias)?;
3247 Ok((expr, scope))
3248}
3249
3250fn plan_table_function_internal(
3255 qcx: &QueryContext,
3256 Function {
3257 name,
3258 args,
3259 filter,
3260 over,
3261 distinct,
3262 }: &Function<Aug>,
3263 with_ordinality: bool,
3264 table_name: Option<FullItemName>,
3265) -> Result<(HirRelationExpr, Scope), PlanError> {
3266 assert_none!(filter, "cannot parse table function with FILTER");
3267 assert_none!(over, "cannot parse table function with OVER");
3268 assert!(!*distinct, "cannot parse table function with DISTINCT");
3269
3270 let ecx = &ExprContext {
3271 qcx,
3272 name: "table function arguments",
3273 scope: &Scope::empty(),
3274 relation_type: &SqlRelationType::empty(),
3275 allow_aggregates: false,
3276 allow_subqueries: true,
3277 allow_parameters: true,
3278 allow_windows: false,
3279 };
3280
3281 let scalar_args = match args {
3282 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3283 FunctionArgs::Args { args, order_by } => {
3284 if !order_by.is_empty() {
3285 sql_bail!(
3286 "ORDER BY specified, but {} is not an aggregate function",
3287 name
3288 );
3289 }
3290 plan_exprs(ecx, args)?
3291 }
3292 };
3293
3294 let table_name = match table_name {
3295 Some(table_name) => table_name.item,
3296 None => name.full_item_name().item.clone(),
3297 };
3298
3299 let scope_name = Some(PartialItemName {
3300 database: None,
3301 schema: None,
3302 item: table_name,
3303 });
3304
3305 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3306 Func::Table(impls) => {
3307 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3308 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3309 let expr = match tf.imp {
3310 TableFuncImpl::CallTable { mut func, exprs } => {
3311 if with_ordinality {
3312 func = TableFunc::with_ordinality(func.clone()).ok_or(
3313 PlanError::Unsupported {
3314 feature: format!("WITH ORDINALITY on {}", func),
3315 discussion_no: None,
3316 },
3317 )?;
3318 }
3319 HirRelationExpr::CallTable { func, exprs }
3320 }
3321 TableFuncImpl::Expr(expr) => {
3322 if !with_ordinality {
3323 expr
3324 } else {
3325 if qcx
3329 .scx
3330 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3331 {
3332 tracing::error!(
3336 %name,
3337 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3338 );
3339 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3340 func: WindowExprType::Scalar(ScalarWindowExpr {
3341 func: ScalarWindowFunc::RowNumber,
3342 order_by: vec![],
3343 }),
3344 partition_by: vec![],
3345 order_by: vec![],
3346 })])
3347 } else {
3348 bail_unsupported!(format!(
3349 "WITH ORDINALITY or ROWS FROM with {}",
3350 name
3351 ));
3352 }
3353 }
3354 }
3355 };
3356 (expr, scope)
3357 }
3358 Func::Scalar(impls) => {
3359 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3360 let output = expr.typ(
3361 &qcx.outer_relation_types,
3362 &SqlRelationType::new(vec![]),
3363 &qcx.scx.param_types.borrow(),
3364 );
3365
3366 let relation = SqlRelationType::new(vec![output]);
3367
3368 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3369 let column_name = normalize::column_name(function_ident);
3370 let name = column_name.to_string();
3371
3372 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3373
3374 let mut func = TableFunc::TabletizedScalar { relation, name };
3375 if with_ordinality {
3376 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3377 feature: format!("WITH ORDINALITY on {}", func),
3378 discussion_no: None,
3379 })?;
3380 }
3381 (
3382 HirRelationExpr::CallTable {
3383 func,
3384 exprs: vec![expr],
3385 },
3386 scope,
3387 )
3388 }
3389 o => sql_bail!(
3390 "{} functions are not supported in functions in FROM",
3391 o.class()
3392 ),
3393 };
3394
3395 if with_ordinality {
3396 scope
3397 .items
3398 .push(ScopeItem::from_name(scope_name, "ordinality"));
3399 }
3400
3401 Ok((expr, scope))
3402}
3403
3404fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3405 if let Some(TableAlias {
3406 name,
3407 columns,
3408 strict,
3409 }) = alias
3410 {
3411 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3412 sql_bail!(
3413 "{} has {} columns available but {} columns specified",
3414 name,
3415 scope.items.len(),
3416 columns.len()
3417 );
3418 }
3419
3420 let table_name = normalize::ident(name.to_owned());
3421 for (i, item) in scope.items.iter_mut().enumerate() {
3422 item.table_name = if item.allow_unqualified_references {
3423 Some(PartialItemName {
3424 database: None,
3425 schema: None,
3426 item: table_name.clone(),
3427 })
3428 } else {
3429 None
3463 };
3464 item.column_name = columns
3465 .get(i)
3466 .map(|a| normalize::column_name(a.clone()))
3467 .unwrap_or_else(|| item.column_name.clone());
3468 }
3469 }
3470 Ok(scope)
3471}
3472
3473fn invent_column_name(
3477 ecx: &ExprContext,
3478 expr: &Expr<Aug>,
3479 table_func_names: &BTreeMap<String, Ident>,
3480) -> Option<ColumnName> {
3481 #[derive(Debug)]
3488 enum NameQuality {
3489 Low,
3490 High,
3491 }
3492
3493 fn invent(
3494 ecx: &ExprContext,
3495 expr: &Expr<Aug>,
3496 table_func_names: &BTreeMap<String, Ident>,
3497 ) -> Option<(ColumnName, NameQuality)> {
3498 match expr {
3499 Expr::Identifier(names) => {
3500 if let [name] = names.as_slice() {
3501 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3502 return Some((
3503 normalize::column_name(table_func_name.clone()),
3504 NameQuality::High,
3505 ));
3506 }
3507 }
3508 names
3509 .last()
3510 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3511 }
3512 Expr::Value(v) => match v {
3513 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3516 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3517 _ => None,
3518 },
3519 Expr::Function(func) => {
3520 let (schema, item) = match &func.name {
3521 ResolvedItemName::Item {
3522 qualifiers,
3523 full_name,
3524 ..
3525 } => (&qualifiers.schema_spec, full_name.item.clone()),
3526 _ => unreachable!(),
3527 };
3528
3529 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3530 || schema
3531 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3532 {
3533 None
3534 } else {
3535 Some((item.into(), NameQuality::High))
3536 }
3537 }
3538 Expr::HomogenizingFunction { function, .. } => Some((
3539 function.to_string().to_lowercase().into(),
3540 NameQuality::High,
3541 )),
3542 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3543 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3544 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3545 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3546 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3547 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3548 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3549 },
3550 Expr::Case { else_result, .. } => {
3551 match else_result
3552 .as_ref()
3553 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3554 {
3555 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3556 _ => Some(("case".into(), NameQuality::Low)),
3557 }
3558 }
3559 Expr::FieldAccess { field, .. } => {
3560 Some((normalize::column_name(field.clone()), NameQuality::High))
3561 }
3562 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3563 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3564 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3565 let (_expr, scope) =
3569 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3570 scope
3571 .items
3572 .first()
3573 .map(|name| (name.column_name.clone(), NameQuality::High))
3574 }
3575 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3576 _ => None,
3577 }
3578 }
3579
3580 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3581}
3582
3583#[derive(Debug)]
3584enum ExpandedSelectItem<'a> {
3585 InputOrdinal(usize),
3586 Expr(Cow<'a, Expr<Aug>>),
3587}
3588
3589impl ExpandedSelectItem<'_> {
3590 fn as_expr(&self) -> Option<&Expr<Aug>> {
3591 match self {
3592 ExpandedSelectItem::InputOrdinal(_) => None,
3593 ExpandedSelectItem::Expr(expr) => Some(expr),
3594 }
3595 }
3596}
3597
3598fn expand_select_item<'a>(
3599 ecx: &ExprContext,
3600 s: &'a SelectItem<Aug>,
3601 table_func_names: &BTreeMap<String, Ident>,
3602) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3603 match s {
3604 SelectItem::Expr {
3605 expr: Expr::QualifiedWildcard(table_name),
3606 alias: _,
3607 } => {
3608 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3609 let table_name =
3610 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3611 let out: Vec<_> = ecx
3612 .scope
3613 .items
3614 .iter()
3615 .enumerate()
3616 .filter(|(_i, item)| item.is_from_table(&table_name))
3617 .map(|(i, item)| {
3618 let name = item.column_name.clone();
3619 (ExpandedSelectItem::InputOrdinal(i), name)
3620 })
3621 .collect();
3622 if out.is_empty() {
3623 sql_bail!("no table named '{}' in scope", table_name);
3624 }
3625 Ok(out)
3626 }
3627 SelectItem::Expr {
3628 expr: Expr::WildcardAccess(sql_expr),
3629 alias: _,
3630 } => {
3631 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3632 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3638 let fields = match ecx.scalar_type(&expr) {
3639 SqlScalarType::Record { fields, .. } => fields,
3640 ty => sql_bail!(
3641 "type {} is not composite",
3642 ecx.humanize_scalar_type(&ty, false)
3643 ),
3644 };
3645 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3646 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3647 if let [name] = ident.as_slice() {
3648 if let Ok(items) = ecx.scope.items_from_table(
3649 &[],
3650 &PartialItemName {
3651 database: None,
3652 schema: None,
3653 item: name.as_str().to_string(),
3654 },
3655 ) {
3656 for (_, item) in items {
3657 if item
3658 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3659 {
3660 skip_cols.insert(item.column_name.clone());
3661 }
3662 }
3663 }
3664 }
3665 }
3666 let items = fields
3667 .iter()
3668 .filter_map(|(name, _ty)| {
3669 if skip_cols.contains(name) {
3670 None
3671 } else {
3672 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3673 expr: sql_expr.clone(),
3674 field: name.clone().into(),
3675 }));
3676 Some((item, name.clone()))
3677 }
3678 })
3679 .collect();
3680 Ok(items)
3681 }
3682 SelectItem::Wildcard => {
3683 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3684 let items: Vec<_> = ecx
3685 .scope
3686 .items
3687 .iter()
3688 .enumerate()
3689 .filter(|(_i, item)| item.allow_unqualified_references)
3690 .map(|(i, item)| {
3691 let name = item.column_name.clone();
3692 (ExpandedSelectItem::InputOrdinal(i), name)
3693 })
3694 .collect();
3695
3696 Ok(items)
3697 }
3698 SelectItem::Expr { expr, alias } => {
3699 let name = alias
3700 .clone()
3701 .map(normalize::column_name)
3702 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3703 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3704 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3705 }
3706 }
3707}
3708
3709fn plan_join(
3710 left_qcx: &QueryContext,
3711 left: HirRelationExpr,
3712 left_scope: Scope,
3713 join: &Join<Aug>,
3714) -> Result<(HirRelationExpr, Scope), PlanError> {
3715 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3716 let (kind, constraint) = match &join.join_operator {
3717 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3718 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3719 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3720 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3721 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3722 };
3723
3724 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3725 if !kind.can_be_correlated() {
3726 for item in &mut right_qcx.outer_scopes[0].items {
3727 item.error_if_referenced =
3732 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3733 table: table.cloned(),
3734 column: column.clone(),
3735 });
3736 }
3737 }
3738 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3739
3740 let (expr, scope) = match constraint {
3741 JoinConstraint::On(expr) => {
3742 let product_scope = left_scope.product(right_scope)?;
3743 let ecx = &ExprContext {
3744 qcx: left_qcx,
3745 name: "ON clause",
3746 scope: &product_scope,
3747 relation_type: &SqlRelationType::new(
3748 left_qcx
3749 .relation_type(&left)
3750 .column_types
3751 .into_iter()
3752 .chain(right_qcx.relation_type(&right).column_types)
3753 .collect(),
3754 ),
3755 allow_aggregates: false,
3756 allow_subqueries: true,
3757 allow_parameters: true,
3758 allow_windows: false,
3759 };
3760 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3761 let joined = left.join(right, on, kind);
3762 (joined, product_scope)
3763 }
3764 JoinConstraint::Using { columns, alias } => {
3765 let column_names = columns
3766 .iter()
3767 .map(|ident| normalize::column_name(ident.clone()))
3768 .collect::<Vec<_>>();
3769
3770 plan_using_constraint(
3771 &column_names,
3772 left_qcx,
3773 left,
3774 left_scope,
3775 &right_qcx,
3776 right,
3777 right_scope,
3778 kind,
3779 alias.as_ref(),
3780 )?
3781 }
3782 JoinConstraint::Natural => {
3783 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3786 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3787 let left_column_names = left_scope.column_names();
3788 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3789 let column_names: Vec<_> = left_column_names
3790 .filter(|col| right_column_names.contains(col))
3791 .cloned()
3792 .collect();
3793 plan_using_constraint(
3794 &column_names,
3795 left_qcx,
3796 left,
3797 left_scope,
3798 &right_qcx,
3799 right,
3800 right_scope,
3801 kind,
3802 None,
3803 )?
3804 }
3805 };
3806 Ok((expr, scope))
3807}
3808
3809#[allow(clippy::too_many_arguments)]
3811fn plan_using_constraint(
3812 column_names: &[ColumnName],
3813 left_qcx: &QueryContext,
3814 left: HirRelationExpr,
3815 left_scope: Scope,
3816 right_qcx: &QueryContext,
3817 right: HirRelationExpr,
3818 right_scope: Scope,
3819 kind: JoinKind,
3820 alias: Option<&Ident>,
3821) -> Result<(HirRelationExpr, Scope), PlanError> {
3822 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3823
3824 let mut unique_column_names = BTreeSet::new();
3827 for c in column_names {
3828 if !unique_column_names.insert(c) {
3829 return Err(PlanError::Unsupported {
3830 feature: format!(
3831 "column name {} appears more than once in USING clause",
3832 c.quoted()
3833 ),
3834 discussion_no: None,
3835 });
3836 }
3837 }
3838
3839 let alias_item_name = alias.map(|alias| PartialItemName {
3840 database: None,
3841 schema: None,
3842 item: alias.clone().to_string(),
3843 });
3844
3845 if let Some(alias_item_name) = &alias_item_name {
3846 for partial_item_name in both_scope.table_names() {
3847 if partial_item_name.matches(alias_item_name) {
3848 sql_bail!(
3849 "table name \"{}\" specified more than once",
3850 alias_item_name
3851 )
3852 }
3853 }
3854 }
3855
3856 let ecx = &ExprContext {
3857 qcx: right_qcx,
3858 name: "USING clause",
3859 scope: &both_scope,
3860 relation_type: &SqlRelationType::new(
3861 left_qcx
3862 .relation_type(&left)
3863 .column_types
3864 .into_iter()
3865 .chain(right_qcx.relation_type(&right).column_types)
3866 .collect(),
3867 ),
3868 allow_aggregates: false,
3869 allow_subqueries: false,
3870 allow_parameters: false,
3871 allow_windows: false,
3872 };
3873
3874 let mut join_exprs = vec![];
3875 let mut map_exprs = vec![];
3876 let mut new_items = vec![];
3877 let mut join_cols = vec![];
3878 let mut hidden_cols = vec![];
3879
3880 for column_name in column_names {
3881 let (lhs, lhs_name) = left_scope.resolve_using_column(
3883 column_name,
3884 JoinSide::Left,
3885 &mut left_qcx.name_manager.borrow_mut(),
3886 )?;
3887 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3888 column_name,
3889 JoinSide::Right,
3890 &mut right_qcx.name_manager.borrow_mut(),
3891 )?;
3892
3893 rhs.column += left_scope.len();
3895
3896 let mut exprs = coerce_homogeneous_exprs(
3898 &ecx.with_name(&format!(
3899 "NATURAL/USING join column {}",
3900 column_name.quoted()
3901 )),
3902 vec![
3903 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3904 lhs,
3905 Arc::clone(&lhs_name),
3906 )),
3907 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908 rhs,
3909 Arc::clone(&rhs_name),
3910 )),
3911 ],
3912 None,
3913 )?;
3914 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3915
3916 match kind {
3917 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3918 join_cols.push(lhs.column);
3919 hidden_cols.push(rhs.column);
3920 }
3921 JoinKind::RightOuter => {
3922 join_cols.push(rhs.column);
3923 hidden_cols.push(lhs.column);
3924 }
3925 JoinKind::FullOuter => {
3926 join_cols.push(both_scope.items.len() + map_exprs.len());
3929 hidden_cols.push(lhs.column);
3930 hidden_cols.push(rhs.column);
3931 map_exprs.push(HirScalarExpr::call_variadic(
3932 VariadicFunc::Coalesce,
3933 vec![expr1.clone(), expr2.clone()],
3934 ));
3935 new_items.push(ScopeItem::from_column_name(column_name));
3936 }
3937 }
3938
3939 if alias_item_name.is_some() {
3944 let new_item_col = both_scope.items.len() + new_items.len();
3945 join_cols.push(new_item_col);
3946 hidden_cols.push(new_item_col);
3947
3948 new_items.push(ScopeItem::from_name(
3949 alias_item_name.clone(),
3950 column_name.clone().to_string(),
3951 ));
3952
3953 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3957 }
3958
3959 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3960 }
3961 both_scope.items.extend(new_items);
3962
3963 for c in hidden_cols {
3967 both_scope.items[c].allow_unqualified_references = false;
3968 }
3969
3970 let project_key = join_cols
3972 .into_iter()
3973 .chain(0..both_scope.items.len())
3974 .unique()
3975 .collect::<Vec<_>>();
3976
3977 both_scope = both_scope.project(&project_key);
3978
3979 let on = HirScalarExpr::variadic_and(join_exprs);
3980
3981 let both = left
3982 .join(right, on, kind)
3983 .map(map_exprs)
3984 .project(project_key);
3985 Ok((both, both_scope))
3986}
3987
3988pub fn plan_expr<'a>(
3989 ecx: &'a ExprContext,
3990 e: &Expr<Aug>,
3991) -> Result<CoercibleScalarExpr, PlanError> {
3992 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3993}
3994
3995fn plan_expr_inner<'a>(
3996 ecx: &'a ExprContext,
3997 e: &Expr<Aug>,
3998) -> Result<CoercibleScalarExpr, PlanError> {
3999 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4000 return Ok(HirScalarExpr::named_column(
4002 i,
4003 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4004 )
4005 .into());
4006 }
4007
4008 match e {
4009 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4011 Ok(plan_identifier(ecx, names)?.into())
4012 }
4013
4014 Expr::Value(val) => plan_literal(val),
4016 Expr::Parameter(n) => plan_parameter(ecx, *n),
4017 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4018 Expr::List(exprs) => plan_list(ecx, exprs, None),
4019 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4020 Expr::Row { exprs } => plan_row(ecx, exprs),
4021
4022 Expr::Op { op, expr1, expr2 } => {
4024 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4025 }
4026 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4027 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4028
4029 Expr::Not { expr } => plan_not(ecx, expr),
4031 Expr::And { left, right } => plan_and(ecx, left, right),
4032 Expr::Or { left, right } => plan_or(ecx, left, right),
4033 Expr::IsExpr {
4034 expr,
4035 construct,
4036 negated,
4037 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4038 Expr::Case {
4039 operand,
4040 conditions,
4041 results,
4042 else_result,
4043 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4044 Expr::HomogenizingFunction { function, exprs } => {
4045 plan_homogenizing_function(ecx, function, exprs)
4046 }
4047 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4048 ecx,
4049 &None,
4050 &[l_expr.clone().equals(*r_expr.clone())],
4051 &[Expr::null()],
4052 &Some(Box::new(*l_expr.clone())),
4053 )?
4054 .into()),
4055 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4056 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4057 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4058 Expr::Like {
4059 expr,
4060 pattern,
4061 escape,
4062 case_insensitive,
4063 negated,
4064 } => Ok(plan_like(
4065 ecx,
4066 expr,
4067 pattern,
4068 escape.as_deref(),
4069 *case_insensitive,
4070 *negated,
4071 )?
4072 .into()),
4073
4074 Expr::InList {
4075 expr,
4076 list,
4077 negated,
4078 } => plan_in_list(ecx, expr, list, negated),
4079
4080 Expr::Exists(query) => plan_exists(ecx, query),
4082 Expr::Subquery(query) => plan_subquery(ecx, query),
4083 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4084 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4085 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4086 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4087 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4088 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4089 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4090 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4091 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4092 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4093 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4094 }
4095}
4096
4097fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4098 if !ecx.allow_parameters {
4099 return Err(PlanError::UnknownParameter(n));
4103 }
4104 if n == 0 || n > 65536 {
4105 return Err(PlanError::UnknownParameter(n));
4106 }
4107 if ecx.param_types().borrow().contains_key(&n) {
4108 Ok(HirScalarExpr::parameter(n).into())
4109 } else {
4110 Ok(CoercibleScalarExpr::Parameter(n))
4111 }
4112}
4113
4114fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4115 let mut out = vec![];
4116 for e in exprs {
4117 out.push(plan_expr(ecx, e)?);
4118 }
4119 Ok(CoercibleScalarExpr::LiteralRecord(out))
4120}
4121
4122fn plan_cast(
4123 ecx: &ExprContext,
4124 expr: &Expr<Aug>,
4125 data_type: &ResolvedDataType,
4126) -> Result<CoercibleScalarExpr, PlanError> {
4127 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4128 let expr = match expr {
4129 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4138 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4139 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4140 _ => plan_expr(ecx, expr)?,
4141 };
4142 let ecx = &ecx.with_name("CAST");
4143 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4144 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4145 Ok(expr.into())
4146}
4147
4148fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4149 let ecx = ecx.with_name("NOT argument");
4150 Ok(plan_expr(&ecx, expr)?
4151 .type_as(&ecx, &SqlScalarType::Bool)?
4152 .call_unary(UnaryFunc::Not(expr_func::Not))
4153 .into())
4154}
4155
4156fn plan_and(
4157 ecx: &ExprContext,
4158 left: &Expr<Aug>,
4159 right: &Expr<Aug>,
4160) -> Result<CoercibleScalarExpr, PlanError> {
4161 let ecx = ecx.with_name("AND argument");
4162 Ok(HirScalarExpr::variadic_and(vec![
4163 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4164 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4165 ])
4166 .into())
4167}
4168
4169fn plan_or(
4170 ecx: &ExprContext,
4171 left: &Expr<Aug>,
4172 right: &Expr<Aug>,
4173) -> Result<CoercibleScalarExpr, PlanError> {
4174 let ecx = ecx.with_name("OR argument");
4175 Ok(HirScalarExpr::variadic_or(vec![
4176 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4177 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4178 ])
4179 .into())
4180}
4181
4182fn plan_in_list(
4183 ecx: &ExprContext,
4184 lhs: &Expr<Aug>,
4185 list: &Vec<Expr<Aug>>,
4186 negated: &bool,
4187) -> Result<CoercibleScalarExpr, PlanError> {
4188 let ecx = ecx.with_name("IN list");
4189 let or = HirScalarExpr::variadic_or(
4190 list.into_iter()
4191 .map(|e| {
4192 let eq = lhs.clone().equals(e.clone());
4193 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4194 })
4195 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4196 );
4197 Ok(if *negated {
4198 or.call_unary(UnaryFunc::Not(expr_func::Not))
4199 } else {
4200 or
4201 }
4202 .into())
4203}
4204
4205fn plan_homogenizing_function(
4206 ecx: &ExprContext,
4207 function: &HomogenizingFunction,
4208 exprs: &[Expr<Aug>],
4209) -> Result<CoercibleScalarExpr, PlanError> {
4210 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4212 match function {
4213 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4214 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4215 HomogenizingFunction::Least => VariadicFunc::Least,
4216 },
4217 coerce_homogeneous_exprs(
4218 &ecx.with_name(&function.to_string().to_lowercase()),
4219 plan_exprs(ecx, exprs)?,
4220 None,
4221 )?,
4222 );
4223 Ok(expr.into())
4224}
4225
4226fn plan_field_access(
4227 ecx: &ExprContext,
4228 expr: &Expr<Aug>,
4229 field: &Ident,
4230) -> Result<CoercibleScalarExpr, PlanError> {
4231 let field = normalize::column_name(field.clone());
4232 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4233 let ty = ecx.scalar_type(&expr);
4234 let i = match &ty {
4235 SqlScalarType::Record { fields, .. } => {
4236 fields.iter().position(|(name, _ty)| *name == field)
4237 }
4238 ty => sql_bail!(
4239 "column notation applied to type {}, which is not a composite type",
4240 ecx.humanize_scalar_type(ty, false)
4241 ),
4242 };
4243 match i {
4244 None => sql_bail!(
4245 "field {} not found in data type {}",
4246 field,
4247 ecx.humanize_scalar_type(&ty, false)
4248 ),
4249 Some(i) => Ok(expr
4250 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4251 .into()),
4252 }
4253}
4254
4255fn plan_subscript(
4256 ecx: &ExprContext,
4257 expr: &Expr<Aug>,
4258 positions: &[SubscriptPosition<Aug>],
4259) -> Result<CoercibleScalarExpr, PlanError> {
4260 assert!(
4261 !positions.is_empty(),
4262 "subscript expression must contain at least one position"
4263 );
4264
4265 let ecx = &ecx.with_name("subscripting");
4266 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4267 let ty = ecx.scalar_type(&expr);
4268 match &ty {
4269 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4270 ecx,
4271 expr,
4272 positions,
4273 if ty == SqlScalarType::Int2Vector {
4277 1
4278 } else {
4279 0
4280 },
4281 ),
4282 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4283 SqlScalarType::List { element_type, .. } => {
4284 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4286 let n_layers = ty.unwrap_list_n_layers();
4287 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4288 }
4289 ty => sql_bail!(
4290 "cannot subscript type {}",
4291 ecx.humanize_scalar_type(ty, false)
4292 ),
4293 }
4294}
4295
4296fn extract_scalar_subscript_from_positions<'a>(
4300 positions: &'a [SubscriptPosition<Aug>],
4301 expr_type_name: &str,
4302) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4303 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4304 for p in positions {
4305 if p.explicit_slice {
4306 sql_bail!("{} subscript does not support slices", expr_type_name);
4307 }
4308 assert!(
4309 p.end.is_none(),
4310 "index-appearing subscripts cannot have end value"
4311 );
4312 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4313 }
4314 Ok(scalar_subscripts)
4315}
4316
4317fn plan_subscript_array(
4318 ecx: &ExprContext,
4319 expr: HirScalarExpr,
4320 positions: &[SubscriptPosition<Aug>],
4321 offset: i64,
4322) -> Result<CoercibleScalarExpr, PlanError> {
4323 let mut exprs = Vec::with_capacity(positions.len() + 1);
4324 exprs.push(expr);
4325
4326 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4329
4330 for i in indexes {
4331 exprs.push(plan_expr(ecx, i)?.cast_to(
4332 ecx,
4333 CastContext::Explicit,
4334 &SqlScalarType::Int64,
4335 )?);
4336 }
4337
4338 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4339}
4340
4341fn plan_subscript_list(
4342 ecx: &ExprContext,
4343 mut expr: HirScalarExpr,
4344 positions: &[SubscriptPosition<Aug>],
4345 mut remaining_layers: usize,
4346 elem_type_name: &str,
4347) -> Result<CoercibleScalarExpr, PlanError> {
4348 let mut i = 0;
4349
4350 while i < positions.len() {
4351 let j = positions[i..]
4353 .iter()
4354 .position(|p| p.explicit_slice)
4355 .unwrap_or(positions.len() - i);
4356 if j != 0 {
4357 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4358 let (n, e) = plan_index_list(
4359 ecx,
4360 expr,
4361 indexes.as_slice(),
4362 remaining_layers,
4363 elem_type_name,
4364 )?;
4365 remaining_layers = n;
4366 expr = e;
4367 i += j;
4368 }
4369
4370 let j = positions[i..]
4372 .iter()
4373 .position(|p| !p.explicit_slice)
4374 .unwrap_or(positions.len() - i);
4375 if j != 0 {
4376 expr = plan_slice_list(
4377 ecx,
4378 expr,
4379 &positions[i..i + j],
4380 remaining_layers,
4381 elem_type_name,
4382 )?;
4383 i += j;
4384 }
4385 }
4386
4387 Ok(expr.into())
4388}
4389
4390fn plan_index_list(
4391 ecx: &ExprContext,
4392 expr: HirScalarExpr,
4393 indexes: &[&Expr<Aug>],
4394 n_layers: usize,
4395 elem_type_name: &str,
4396) -> Result<(usize, HirScalarExpr), PlanError> {
4397 let depth = indexes.len();
4398
4399 if depth > n_layers {
4400 if n_layers == 0 {
4401 sql_bail!("cannot subscript type {}", elem_type_name)
4402 } else {
4403 sql_bail!(
4404 "cannot index into {} layers; list only has {} layer{}",
4405 depth,
4406 n_layers,
4407 if n_layers == 1 { "" } else { "s" }
4408 )
4409 }
4410 }
4411
4412 let mut exprs = Vec::with_capacity(depth + 1);
4413 exprs.push(expr);
4414
4415 for i in indexes {
4416 exprs.push(plan_expr(ecx, i)?.cast_to(
4417 ecx,
4418 CastContext::Explicit,
4419 &SqlScalarType::Int64,
4420 )?);
4421 }
4422
4423 Ok((
4424 n_layers - depth,
4425 HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4426 ))
4427}
4428
4429fn plan_slice_list(
4430 ecx: &ExprContext,
4431 expr: HirScalarExpr,
4432 slices: &[SubscriptPosition<Aug>],
4433 n_layers: usize,
4434 elem_type_name: &str,
4435) -> Result<HirScalarExpr, PlanError> {
4436 if n_layers == 0 {
4437 sql_bail!("cannot subscript type {}", elem_type_name)
4438 }
4439
4440 let mut exprs = Vec::with_capacity(slices.len() + 1);
4442 exprs.push(expr);
4443 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4445 Ok(match position {
4446 Some(p) => {
4447 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4448 }
4449 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4450 })
4451 };
4452 for p in slices {
4453 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4454 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4455 exprs.push(start);
4456 exprs.push(end);
4457 }
4458
4459 Ok(HirScalarExpr::call_variadic(
4460 VariadicFunc::ListSliceLinear,
4461 exprs,
4462 ))
4463}
4464
4465fn plan_like(
4466 ecx: &ExprContext,
4467 expr: &Expr<Aug>,
4468 pattern: &Expr<Aug>,
4469 escape: Option<&Expr<Aug>>,
4470 case_insensitive: bool,
4471 not: bool,
4472) -> Result<HirScalarExpr, PlanError> {
4473 use CastContext::Implicit;
4474 let ecx = ecx.with_name("LIKE argument");
4475 let expr = plan_expr(&ecx, expr)?;
4476 let haystack = match ecx.scalar_type(&expr) {
4477 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4478 .type_as(&ecx, ty)?
4479 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4480 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4481 };
4482 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4483 if let Some(escape) = escape {
4484 pattern = pattern.call_binary(
4485 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4486 expr_func::LikeEscape,
4487 );
4488 }
4489 let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4490 if not {
4491 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4492 } else {
4493 Ok(like)
4494 }
4495}
4496
4497fn plan_subscript_jsonb(
4498 ecx: &ExprContext,
4499 expr: HirScalarExpr,
4500 positions: &[SubscriptPosition<Aug>],
4501) -> Result<CoercibleScalarExpr, PlanError> {
4502 use CastContext::Implicit;
4503 use SqlScalarType::{Int64, String};
4504
4505 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4508
4509 let mut exprs = Vec::with_capacity(subscripts.len());
4510 for s in subscripts {
4511 let subscript = plan_expr(ecx, s)?;
4512 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4513 subscript
4514 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4515 typeconv::to_string(ecx, subscript)
4519 } else {
4520 sql_bail!("jsonb subscript type must be coercible to integer or text");
4521 };
4522 exprs.push(subscript);
4523 }
4524
4525 let expr = expr.call_binary(
4528 HirScalarExpr::call_variadic(
4529 VariadicFunc::ArrayCreate {
4530 elem_type: SqlScalarType::String,
4531 },
4532 exprs,
4533 ),
4534 BinaryFunc::JsonbGetPath,
4535 );
4536 Ok(expr.into())
4537}
4538
4539fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4540 if !ecx.allow_subqueries {
4541 sql_bail!("{} does not allow subqueries", ecx.name)
4542 }
4543 let mut qcx = ecx.derived_query_context();
4544 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4545 Ok(expr.exists().into())
4546}
4547
4548fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4549 if !ecx.allow_subqueries {
4550 sql_bail!("{} does not allow subqueries", ecx.name)
4551 }
4552 let mut qcx = ecx.derived_query_context();
4553 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4554 let column_types = qcx.relation_type(&expr).column_types;
4555 if column_types.len() != 1 {
4556 sql_bail!(
4557 "Expected subselect to return 1 column, got {} columns",
4558 column_types.len()
4559 );
4560 }
4561 Ok(expr.select().into())
4562}
4563
4564fn plan_list_subquery(
4565 ecx: &ExprContext,
4566 query: &Query<Aug>,
4567) -> Result<CoercibleScalarExpr, PlanError> {
4568 plan_vector_like_subquery(
4569 ecx,
4570 query,
4571 |_| false,
4572 |elem_type| VariadicFunc::ListCreate { elem_type },
4573 |order_by| AggregateFunc::ListConcat { order_by },
4574 expr_func::ListListConcat.into(),
4575 |elem_type| {
4576 HirScalarExpr::literal(
4577 Datum::empty_list(),
4578 SqlScalarType::List {
4579 element_type: Box::new(elem_type),
4580 custom_id: None,
4581 },
4582 )
4583 },
4584 "list",
4585 )
4586}
4587
4588fn plan_array_subquery(
4589 ecx: &ExprContext,
4590 query: &Query<Aug>,
4591) -> Result<CoercibleScalarExpr, PlanError> {
4592 plan_vector_like_subquery(
4593 ecx,
4594 query,
4595 |elem_type| {
4596 matches!(
4597 elem_type,
4598 SqlScalarType::Char { .. }
4599 | SqlScalarType::Array { .. }
4600 | SqlScalarType::List { .. }
4601 | SqlScalarType::Map { .. }
4602 )
4603 },
4604 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4605 |order_by| AggregateFunc::ArrayConcat { order_by },
4606 expr_func::ArrayArrayConcat.into(),
4607 |elem_type| {
4608 HirScalarExpr::literal(
4609 Datum::empty_array(),
4610 SqlScalarType::Array(Box::new(elem_type)),
4611 )
4612 },
4613 "[]",
4614 )
4615}
4616
4617fn plan_vector_like_subquery<F1, F2, F3, F4>(
4619 ecx: &ExprContext,
4620 query: &Query<Aug>,
4621 is_unsupported_type: F1,
4622 vector_create: F2,
4623 aggregate_concat: F3,
4624 binary_concat: BinaryFunc,
4625 empty_literal: F4,
4626 vector_type_string: &str,
4627) -> Result<CoercibleScalarExpr, PlanError>
4628where
4629 F1: Fn(&SqlScalarType) -> bool,
4630 F2: Fn(SqlScalarType) -> VariadicFunc,
4631 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4632 F4: Fn(SqlScalarType) -> HirScalarExpr,
4633{
4634 if !ecx.allow_subqueries {
4635 sql_bail!("{} does not allow subqueries", ecx.name)
4636 }
4637
4638 let mut qcx = ecx.derived_query_context();
4639 let mut planned_query = plan_query(&mut qcx, query)?;
4640 if planned_query.limit.is_some()
4641 || !planned_query
4642 .offset
4643 .clone()
4644 .try_into_literal_int64()
4645 .is_ok_and(|offset| offset == 0)
4646 {
4647 planned_query.expr = HirRelationExpr::top_k(
4648 planned_query.expr,
4649 vec![],
4650 planned_query.order_by.clone(),
4651 planned_query.limit,
4652 planned_query.offset,
4653 planned_query.group_size_hints.limit_input_group_size,
4654 );
4655 }
4656
4657 if planned_query.project.len() != 1 {
4658 sql_bail!(
4659 "Expected subselect to return 1 column, got {} columns",
4660 planned_query.project.len()
4661 );
4662 }
4663
4664 let project_column = *planned_query.project.get(0).unwrap();
4665 let elem_type = qcx
4666 .relation_type(&planned_query.expr)
4667 .column_types
4668 .get(project_column)
4669 .cloned()
4670 .unwrap()
4671 .scalar_type();
4672
4673 if is_unsupported_type(&elem_type) {
4674 bail_unsupported!(format!(
4675 "cannot build array from subquery because return type {}{}",
4676 ecx.humanize_scalar_type(&elem_type, false),
4677 vector_type_string
4678 ));
4679 }
4680
4681 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4684 vector_create(elem_type.clone()),
4685 vec![HirScalarExpr::column(project_column)],
4686 ))
4687 .chain(
4688 planned_query
4689 .order_by
4690 .iter()
4691 .map(|co| HirScalarExpr::column(co.column)),
4692 )
4693 .collect();
4694
4695 let aggregation_projection = vec![0];
4699 let aggregation_order_by = planned_query
4700 .order_by
4701 .into_iter()
4702 .enumerate()
4703 .map(|(i, order)| ColumnOrder { column: i, ..order })
4704 .collect();
4705
4706 let reduced_expr = planned_query
4707 .expr
4708 .reduce(
4709 vec![],
4710 vec![AggregateExpr {
4711 func: aggregate_concat(aggregation_order_by),
4712 expr: Box::new(HirScalarExpr::call_variadic(
4713 VariadicFunc::RecordCreate {
4714 field_names: iter::repeat(ColumnName::from(""))
4715 .take(aggregation_exprs.len())
4716 .collect(),
4717 },
4718 aggregation_exprs,
4719 )),
4720 distinct: false,
4721 }],
4722 None,
4723 )
4724 .project(aggregation_projection);
4725
4726 Ok(reduced_expr
4728 .select()
4729 .call_binary(empty_literal(elem_type), binary_concat)
4730 .into())
4731}
4732
4733fn plan_map_subquery(
4734 ecx: &ExprContext,
4735 query: &Query<Aug>,
4736) -> Result<CoercibleScalarExpr, PlanError> {
4737 if !ecx.allow_subqueries {
4738 sql_bail!("{} does not allow subqueries", ecx.name)
4739 }
4740
4741 let mut qcx = ecx.derived_query_context();
4742 let mut query = plan_query(&mut qcx, query)?;
4743 if query.limit.is_some()
4744 || !query
4745 .offset
4746 .clone()
4747 .try_into_literal_int64()
4748 .is_ok_and(|offset| offset == 0)
4749 {
4750 query.expr = HirRelationExpr::top_k(
4751 query.expr,
4752 vec![],
4753 query.order_by.clone(),
4754 query.limit,
4755 query.offset,
4756 query.group_size_hints.limit_input_group_size,
4757 );
4758 }
4759 if query.project.len() != 2 {
4760 sql_bail!(
4761 "expected map subquery to return 2 columns, got {} columns",
4762 query.project.len()
4763 );
4764 }
4765
4766 let query_types = qcx.relation_type(&query.expr).column_types;
4767 let key_column = query.project[0];
4768 let key_type = query_types[key_column].clone().scalar_type();
4769 let value_column = query.project[1];
4770 let value_type = query_types[value_column].clone().scalar_type();
4771
4772 if key_type != SqlScalarType::String {
4773 sql_bail!("cannot build map from subquery because first column is not of type text");
4774 }
4775
4776 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4777 VariadicFunc::RecordCreate {
4778 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4779 },
4780 vec![
4781 HirScalarExpr::column(key_column),
4782 HirScalarExpr::column(value_column),
4783 ],
4784 ))
4785 .chain(
4786 query
4787 .order_by
4788 .iter()
4789 .map(|co| HirScalarExpr::column(co.column)),
4790 )
4791 .collect();
4792
4793 let expr = query
4794 .expr
4795 .reduce(
4796 vec![],
4797 vec![AggregateExpr {
4798 func: AggregateFunc::MapAgg {
4799 order_by: query
4800 .order_by
4801 .into_iter()
4802 .enumerate()
4803 .map(|(i, order)| ColumnOrder { column: i, ..order })
4804 .collect(),
4805 value_type: value_type.clone(),
4806 },
4807 expr: Box::new(HirScalarExpr::call_variadic(
4808 VariadicFunc::RecordCreate {
4809 field_names: iter::repeat(ColumnName::from(""))
4810 .take(aggregation_exprs.len())
4811 .collect(),
4812 },
4813 aggregation_exprs,
4814 )),
4815 distinct: false,
4816 }],
4817 None,
4818 )
4819 .project(vec![0]);
4820
4821 let expr = HirScalarExpr::call_variadic(
4823 VariadicFunc::Coalesce,
4824 vec![
4825 expr.select(),
4826 HirScalarExpr::literal(
4827 Datum::empty_map(),
4828 SqlScalarType::Map {
4829 value_type: Box::new(value_type),
4830 custom_id: None,
4831 },
4832 ),
4833 ],
4834 );
4835
4836 Ok(expr.into())
4837}
4838
4839fn plan_collate(
4840 ecx: &ExprContext,
4841 expr: &Expr<Aug>,
4842 collation: &UnresolvedItemName,
4843) -> Result<CoercibleScalarExpr, PlanError> {
4844 if collation.0.len() == 2
4845 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4846 && collation.0[1] == ident!("default")
4847 {
4848 plan_expr(ecx, expr)
4849 } else {
4850 bail_unsupported!("COLLATE");
4851 }
4852}
4853
4854fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4861where
4862 E: std::borrow::Borrow<Expr<Aug>>,
4863{
4864 let mut out = vec![];
4865 for expr in exprs {
4866 out.push(plan_expr(ecx, expr.borrow())?);
4867 }
4868 Ok(out)
4869}
4870
4871fn plan_array(
4873 ecx: &ExprContext,
4874 exprs: &[Expr<Aug>],
4875 type_hint: Option<&SqlScalarType>,
4876) -> Result<CoercibleScalarExpr, PlanError> {
4877 let mut out = vec![];
4879 for expr in exprs {
4880 out.push(match expr {
4881 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4884 _ => plan_expr(ecx, expr)?,
4885 });
4886 }
4887
4888 let type_hint = match type_hint {
4890 Some(SqlScalarType::Array(elem_type)) => {
4895 let multidimensional = out.iter().any(|e| {
4896 matches!(
4897 ecx.scalar_type(e),
4898 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4899 )
4900 });
4901 if multidimensional {
4902 type_hint
4903 } else {
4904 Some(&**elem_type)
4905 }
4906 }
4907 Some(_) => None,
4911 None => None,
4913 };
4914
4915 let (elem_type, exprs) = if exprs.is_empty() {
4917 if let Some(elem_type) = type_hint {
4918 (elem_type.clone(), vec![])
4919 } else {
4920 sql_bail!("cannot determine type of empty array");
4921 }
4922 } else {
4923 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4924 (ecx.scalar_type(&out[0]), out)
4925 };
4926
4927 if matches!(
4933 elem_type,
4934 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4935 ) {
4936 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4937 }
4938
4939 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4940}
4941
4942fn plan_list(
4943 ecx: &ExprContext,
4944 exprs: &[Expr<Aug>],
4945 type_hint: Option<&SqlScalarType>,
4946) -> Result<CoercibleScalarExpr, PlanError> {
4947 let (elem_type, exprs) = if exprs.is_empty() {
4948 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4949 (element_type.without_modifiers(), vec![])
4950 } else {
4951 sql_bail!("cannot determine type of empty list");
4952 }
4953 } else {
4954 let type_hint = match type_hint {
4955 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4956 _ => None,
4957 };
4958
4959 let mut out = vec![];
4960 for expr in exprs {
4961 out.push(match expr {
4962 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4965 _ => plan_expr(ecx, expr)?,
4966 });
4967 }
4968 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4969 (ecx.scalar_type(&out[0]).without_modifiers(), out)
4970 };
4971
4972 if matches!(elem_type, SqlScalarType::Char { .. }) {
4973 bail_unsupported!("char list");
4974 }
4975
4976 Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4977}
4978
4979fn plan_map(
4980 ecx: &ExprContext,
4981 entries: &[MapEntry<Aug>],
4982 type_hint: Option<&SqlScalarType>,
4983) -> Result<CoercibleScalarExpr, PlanError> {
4984 let (value_type, exprs) = if entries.is_empty() {
4985 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4986 (value_type.without_modifiers(), vec![])
4987 } else {
4988 sql_bail!("cannot determine type of empty map");
4989 }
4990 } else {
4991 let type_hint = match type_hint {
4992 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
4993 _ => None,
4994 };
4995
4996 let mut keys = vec![];
4997 let mut values = vec![];
4998 for MapEntry { key, value } in entries {
4999 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5000 let value = match value {
5001 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5004 _ => plan_expr(ecx, value)?,
5005 };
5006 keys.push(key);
5007 values.push(value);
5008 }
5009 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5010 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5011 let out = itertools::interleave(keys, values).collect();
5012 (value_type, out)
5013 };
5014
5015 if matches!(value_type, SqlScalarType::Char { .. }) {
5016 bail_unsupported!("char map");
5017 }
5018
5019 let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5020 Ok(expr.into())
5021}
5022
5023pub fn coerce_homogeneous_exprs(
5040 ecx: &ExprContext,
5041 exprs: Vec<CoercibleScalarExpr>,
5042 force_type: Option<&SqlScalarType>,
5043) -> Result<Vec<HirScalarExpr>, PlanError> {
5044 assert!(!exprs.is_empty());
5045
5046 let target_holder;
5047 let target = match force_type {
5048 Some(t) => t,
5049 None => {
5050 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5051 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5052 &target_holder
5053 }
5054 };
5055
5056 let mut out = Vec::new();
5058 for expr in exprs {
5059 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5060 let ccx = match force_type {
5061 None => CastContext::Implicit,
5062 Some(_) => CastContext::Explicit,
5063 };
5064 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5065 Ok(expr) => out.push(expr),
5066 Err(_) => sql_bail!(
5067 "{} could not convert type {} to {}",
5068 ecx.name,
5069 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5070 ecx.humanize_scalar_type(target, false),
5071 ),
5072 }
5073 }
5074 Ok(out)
5075}
5076
5077pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5080 obe: &OrderByExpr<T>,
5081 column: usize,
5082) -> ColumnOrder {
5083 let desc = !obe.asc.unwrap_or(true);
5084 ColumnOrder {
5085 column,
5086 desc,
5087 nulls_last: obe.nulls_last.unwrap_or(!desc),
5090 }
5091}
5092
5093fn plan_function_order_by(
5101 ecx: &ExprContext,
5102 order_by: &[OrderByExpr<Aug>],
5103) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5104 let mut order_by_exprs = vec![];
5105 let mut col_orders = vec![];
5106 {
5107 for (i, obe) in order_by.iter().enumerate() {
5108 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5112 order_by_exprs.push(expr);
5113 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5114 }
5115 }
5116 Ok((order_by_exprs, col_orders))
5117}
5118
5119fn plan_aggregate_common(
5121 ecx: &ExprContext,
5122 Function::<Aug> {
5123 name,
5124 args,
5125 filter,
5126 over: _,
5127 distinct,
5128 }: &Function<Aug>,
5129) -> Result<AggregateExpr, PlanError> {
5130 let impls = match resolve_func(ecx, name, args)? {
5145 Func::Aggregate(impls) => impls,
5146 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5147 };
5148
5149 let (args, order_by) = match &args {
5158 FunctionArgs::Star => (vec![], vec![]),
5159 FunctionArgs::Args { args, order_by } => {
5160 if args.is_empty() {
5161 sql_bail!(
5162 "{}(*) must be used to call a parameterless aggregate function",
5163 ecx.qcx
5164 .scx
5165 .humanize_resolved_name(name)
5166 .expect("name actually resolved")
5167 );
5168 }
5169 let args = plan_exprs(ecx, args)?;
5170 (args, order_by.clone())
5171 }
5172 };
5173
5174 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5175
5176 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5177 if let Some(filter) = &filter {
5178 let cond =
5188 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5189 let expr_typ = ecx.scalar_type(&expr);
5190 expr = HirScalarExpr::if_then_else(
5191 cond,
5192 expr,
5193 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5194 );
5195 }
5196
5197 let mut seen_outer = false;
5198 let mut seen_inner = false;
5199 #[allow(deprecated)]
5200 expr.visit_columns(0, &mut |depth, col| {
5201 if depth == 0 && col.level == 0 {
5202 seen_inner = true;
5203 } else if col.level > depth {
5204 seen_outer = true;
5205 }
5206 });
5207 if seen_outer && !seen_inner {
5208 bail_unsupported!(
5209 3720,
5210 "aggregate functions that refer exclusively to outer columns"
5211 );
5212 }
5213
5214 if func.is_order_sensitive() {
5217 let field_names = iter::repeat(ColumnName::from(""))
5218 .take(1 + order_by_exprs.len())
5219 .collect();
5220 let mut exprs = vec![expr];
5221 exprs.extend(order_by_exprs);
5222 expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5223 }
5224
5225 Ok(AggregateExpr {
5226 func,
5227 expr: Box::new(expr),
5228 distinct: *distinct,
5229 })
5230}
5231
5232fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5233 let mut names = names.to_vec();
5234 let col_name = normalize::column_name(names.pop().unwrap());
5235
5236 if !names.is_empty() {
5238 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5239 let (i, i_name) = ecx.scope.resolve_table_column(
5240 &ecx.qcx.outer_scopes,
5241 &table_name,
5242 &col_name,
5243 &mut ecx.qcx.name_manager.borrow_mut(),
5244 )?;
5245 return Ok(HirScalarExpr::named_column(i, i_name));
5246 }
5247
5248 let similar_names = match ecx.scope.resolve_column(
5251 &ecx.qcx.outer_scopes,
5252 &col_name,
5253 &mut ecx.qcx.name_manager.borrow_mut(),
5254 ) {
5255 Ok((i, i_name)) => {
5256 return Ok(HirScalarExpr::named_column(i, i_name));
5257 }
5258 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5259 Err(e) => return Err(e),
5260 };
5261
5262 let items = ecx.scope.items_from_table(
5265 &ecx.qcx.outer_scopes,
5266 &PartialItemName {
5267 database: None,
5268 schema: None,
5269 item: col_name.as_str().to_owned(),
5270 },
5271 )?;
5272 match items.as_slice() {
5273 [] => Err(PlanError::UnknownColumn {
5275 table: None,
5276 column: col_name,
5277 similar: similar_names,
5278 }),
5279 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5284 *column,
5285 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5286 )),
5287 _ => {
5290 let mut has_exists_column = None;
5291 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5292 .into_iter()
5293 .filter_map(|(column, item)| {
5294 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5295 has_exists_column = Some(column);
5296 None
5297 } else {
5298 let expr = HirScalarExpr::named_column(
5299 column,
5300 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5301 );
5302 let name = item.column_name.clone();
5303 Some((expr, name))
5304 }
5305 })
5306 .unzip();
5307 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5309 exprs.into_element()
5310 } else {
5311 HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5312 };
5313 if let Some(has_exists_column) = has_exists_column {
5314 Ok(HirScalarExpr::if_then_else(
5315 HirScalarExpr::unnamed_column(has_exists_column)
5316 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5317 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5318 expr,
5319 ))
5320 } else {
5321 Ok(expr)
5322 }
5323 }
5324 }
5325}
5326
5327fn plan_op(
5328 ecx: &ExprContext,
5329 op: &str,
5330 expr1: &Expr<Aug>,
5331 expr2: Option<&Expr<Aug>>,
5332) -> Result<HirScalarExpr, PlanError> {
5333 let impls = func::resolve_op(op)?;
5334 let args = match expr2 {
5335 None => plan_exprs(ecx, &[expr1])?,
5336 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5337 };
5338 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5339}
5340
5341fn plan_function<'a>(
5342 ecx: &ExprContext,
5343 f @ Function {
5344 name,
5345 args,
5346 filter,
5347 over,
5348 distinct,
5349 }: &'a Function<Aug>,
5350) -> Result<HirScalarExpr, PlanError> {
5351 let impls = match resolve_func(ecx, name, args)? {
5352 Func::Table(_) => {
5353 sql_bail!(
5354 "table functions are not allowed in {} (function {})",
5355 ecx.name,
5356 name
5357 );
5358 }
5359 Func::Scalar(impls) => {
5360 if over.is_some() {
5361 sql_bail!(
5362 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5363 );
5364 }
5365 impls
5366 }
5367 Func::ScalarWindow(impls) => {
5368 let (
5369 ignore_nulls,
5370 order_by_exprs,
5371 col_orders,
5372 _window_frame,
5373 partition_by,
5374 scalar_args,
5375 ) = plan_window_function_non_aggr(ecx, f)?;
5376
5377 if !scalar_args.is_empty() {
5381 if let ResolvedItemName::Item {
5382 full_name: FullItemName { item, .. },
5383 ..
5384 } = name
5385 {
5386 sql_bail!(
5387 "function {} has 0 parameters, but was called with {}",
5388 item,
5389 scalar_args.len()
5390 );
5391 }
5392 }
5393
5394 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5399
5400 if ignore_nulls {
5401 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5404 }
5405
5406 return Ok(HirScalarExpr::windowing(WindowExpr {
5407 func: WindowExprType::Scalar(ScalarWindowExpr {
5408 func,
5409 order_by: col_orders,
5410 }),
5411 partition_by,
5412 order_by: order_by_exprs,
5413 }));
5414 }
5415 Func::ValueWindow(impls) => {
5416 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5417 plan_window_function_non_aggr(ecx, f)?;
5418
5419 let (args_encoded, func) =
5420 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5421
5422 if ignore_nulls {
5423 match func {
5424 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5425 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5426 }
5427 }
5428
5429 return Ok(HirScalarExpr::windowing(WindowExpr {
5430 func: WindowExprType::Value(ValueWindowExpr {
5431 func,
5432 args: Box::new(args_encoded),
5433 order_by: col_orders,
5434 window_frame,
5435 ignore_nulls, }),
5437 partition_by,
5438 order_by: order_by_exprs,
5439 }));
5440 }
5441 Func::Aggregate(_) => {
5442 if f.over.is_none() {
5443 if ecx.allow_aggregates {
5445 sql_bail!(
5448 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5449 name,
5450 );
5451 } else {
5452 sql_bail!(
5455 "aggregate functions are not allowed in {} (function {})",
5456 ecx.name,
5457 name
5458 );
5459 }
5460 } else {
5461 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5462 plan_window_function_common(ecx, &f.name, &f.over)?;
5463
5464 match (&window_frame.start_bound, &window_frame.end_bound) {
5466 (
5467 mz_expr::WindowFrameBound::UnboundedPreceding,
5468 mz_expr::WindowFrameBound::OffsetPreceding(..),
5469 )
5470 | (
5471 mz_expr::WindowFrameBound::UnboundedPreceding,
5472 mz_expr::WindowFrameBound::OffsetFollowing(..),
5473 )
5474 | (
5475 mz_expr::WindowFrameBound::OffsetPreceding(..),
5476 mz_expr::WindowFrameBound::UnboundedFollowing,
5477 )
5478 | (
5479 mz_expr::WindowFrameBound::OffsetFollowing(..),
5480 mz_expr::WindowFrameBound::UnboundedFollowing,
5481 ) => bail_unsupported!("mixed unbounded - offset frames"),
5482 (_, _) => {} }
5484
5485 if ignore_nulls {
5486 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5490 }
5491
5492 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5493
5494 if aggregate_expr.distinct {
5495 bail_unsupported!("DISTINCT in window aggregates");
5497 }
5498
5499 return Ok(HirScalarExpr::windowing(WindowExpr {
5500 func: WindowExprType::Aggregate(AggregateWindowExpr {
5501 aggregate_expr,
5502 order_by: col_orders,
5503 window_frame,
5504 }),
5505 partition_by,
5506 order_by: order_by_exprs,
5507 }));
5508 }
5509 }
5510 };
5511
5512 if over.is_some() {
5513 unreachable!("If there is an OVER clause, we should have returned already above.");
5514 }
5515
5516 if *distinct {
5517 sql_bail!(
5518 "DISTINCT specified, but {} is not an aggregate function",
5519 ecx.qcx
5520 .scx
5521 .humanize_resolved_name(name)
5522 .expect("already resolved")
5523 );
5524 }
5525 if filter.is_some() {
5526 sql_bail!(
5527 "FILTER specified, but {} is not an aggregate function",
5528 ecx.qcx
5529 .scx
5530 .humanize_resolved_name(name)
5531 .expect("already resolved")
5532 );
5533 }
5534
5535 let scalar_args = match &args {
5536 FunctionArgs::Star => {
5537 sql_bail!(
5538 "* argument is invalid with non-aggregate function {}",
5539 ecx.qcx
5540 .scx
5541 .humanize_resolved_name(name)
5542 .expect("already resolved")
5543 )
5544 }
5545 FunctionArgs::Args { args, order_by } => {
5546 if !order_by.is_empty() {
5547 sql_bail!(
5548 "ORDER BY specified, but {} is not an aggregate function",
5549 ecx.qcx
5550 .scx
5551 .humanize_resolved_name(name)
5552 .expect("already resolved")
5553 );
5554 }
5555 plan_exprs(ecx, args)?
5556 }
5557 };
5558
5559 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5560}
5561
5562pub const IGNORE_NULLS_ERROR_MSG: &str =
5563 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5564
5565pub fn resolve_func(
5569 ecx: &ExprContext,
5570 name: &ResolvedItemName,
5571 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5572) -> Result<&'static Func, PlanError> {
5573 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5574 if let Ok(f) = i.func() {
5575 return Ok(f);
5576 }
5577 }
5578
5579 let cexprs = match args {
5582 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5583 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5584 if !order_by.is_empty() {
5585 sql_bail!(
5586 "ORDER BY specified, but {} is not an aggregate function",
5587 name
5588 );
5589 }
5590 plan_exprs(ecx, args)?
5591 }
5592 };
5593
5594 let arg_types: Vec<_> = cexprs
5595 .into_iter()
5596 .map(|ty| match ecx.scalar_type(&ty) {
5597 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5598 CoercibleScalarType::Record(_) => "record".to_string(),
5599 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5600 })
5601 .collect();
5602
5603 Err(PlanError::UnknownFunction {
5604 name: name.to_string(),
5605 arg_types,
5606 })
5607}
5608
5609fn plan_is_expr<'a>(
5610 ecx: &ExprContext,
5611 expr: &'a Expr<Aug>,
5612 construct: &IsExprConstruct<Aug>,
5613 not: bool,
5614) -> Result<HirScalarExpr, PlanError> {
5615 let expr = plan_expr(ecx, expr)?;
5616 let mut expr = match construct {
5617 IsExprConstruct::Null => {
5618 let expr = expr.type_as_any(ecx)?;
5623 expr.call_is_null()
5624 }
5625 IsExprConstruct::Unknown => {
5626 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5627 expr.call_is_null()
5628 }
5629 IsExprConstruct::True => {
5630 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5631 expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5632 }
5633 IsExprConstruct::False => {
5634 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5635 expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5636 }
5637 IsExprConstruct::DistinctFrom(expr2) => {
5638 let expr1 = expr.type_as_any(ecx)?;
5639 let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5640 let term1 = HirScalarExpr::variadic_or(vec![
5647 expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5648 expr1.clone().call_is_null(),
5649 expr2.clone().call_is_null(),
5650 ]);
5651 let term2 = HirScalarExpr::variadic_or(vec![
5652 expr1.call_is_null().not(),
5653 expr2.call_is_null().not(),
5654 ]);
5655 term1.and(term2)
5656 }
5657 };
5658 if not {
5659 expr = expr.not();
5660 }
5661 Ok(expr)
5662}
5663
5664fn plan_case<'a>(
5665 ecx: &ExprContext,
5666 operand: &'a Option<Box<Expr<Aug>>>,
5667 conditions: &'a [Expr<Aug>],
5668 results: &'a [Expr<Aug>],
5669 else_result: &'a Option<Box<Expr<Aug>>>,
5670) -> Result<HirScalarExpr, PlanError> {
5671 let mut cond_exprs = Vec::new();
5672 let mut result_exprs = Vec::new();
5673 for (c, r) in conditions.iter().zip_eq(results) {
5674 let c = match operand {
5675 Some(operand) => operand.clone().equals(c.clone()),
5676 None => c.clone(),
5677 };
5678 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5679 cond_exprs.push(cexpr);
5680 result_exprs.push(r);
5681 }
5682 result_exprs.push(match else_result {
5683 Some(else_result) => else_result,
5684 None => &Expr::Value(Value::Null),
5685 });
5686 let mut result_exprs = coerce_homogeneous_exprs(
5687 &ecx.with_name("CASE"),
5688 plan_exprs(ecx, &result_exprs)?,
5689 None,
5690 )?;
5691 let mut expr = result_exprs.pop().unwrap();
5692 assert_eq!(cond_exprs.len(), result_exprs.len());
5693 for (cexpr, rexpr) in cond_exprs
5694 .into_iter()
5695 .rev()
5696 .zip_eq(result_exprs.into_iter().rev())
5697 {
5698 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5699 }
5700 Ok(expr)
5701}
5702
5703fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5704 let (datum, scalar_type) = match l {
5705 Value::Number(s) => {
5706 let d = strconv::parse_numeric(s.as_str())?;
5707 if !s.contains(&['E', '.'][..]) {
5708 if let Ok(n) = d.0.try_into() {
5710 (Datum::Int32(n), SqlScalarType::Int32)
5711 } else if let Ok(n) = d.0.try_into() {
5712 (Datum::Int64(n), SqlScalarType::Int64)
5713 } else {
5714 (
5715 Datum::Numeric(d),
5716 SqlScalarType::Numeric { max_scale: None },
5717 )
5718 }
5719 } else {
5720 (
5721 Datum::Numeric(d),
5722 SqlScalarType::Numeric { max_scale: None },
5723 )
5724 }
5725 }
5726 Value::HexString(_) => bail_unsupported!("hex string literals"),
5727 Value::Boolean(b) => match b {
5728 false => (Datum::False, SqlScalarType::Bool),
5729 true => (Datum::True, SqlScalarType::Bool),
5730 },
5731 Value::Interval(i) => {
5732 let i = literal::plan_interval(i)?;
5733 (Datum::Interval(i), SqlScalarType::Interval)
5734 }
5735 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5736 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5737 };
5738 let expr = HirScalarExpr::literal(datum, scalar_type);
5739 Ok(expr.into())
5740}
5741
5742fn plan_window_function_non_aggr<'a>(
5745 ecx: &ExprContext,
5746 Function {
5747 name,
5748 args,
5749 filter,
5750 over,
5751 distinct,
5752 }: &'a Function<Aug>,
5753) -> Result<
5754 (
5755 bool,
5756 Vec<HirScalarExpr>,
5757 Vec<ColumnOrder>,
5758 mz_expr::WindowFrame,
5759 Vec<HirScalarExpr>,
5760 Vec<CoercibleScalarExpr>,
5761 ),
5762 PlanError,
5763> {
5764 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5765 plan_window_function_common(ecx, name, over)?;
5766
5767 if *distinct {
5768 sql_bail!(
5769 "DISTINCT specified, but {} is not an aggregate function",
5770 name
5771 );
5772 }
5773
5774 if filter.is_some() {
5775 bail_unsupported!("FILTER in non-aggregate window functions");
5776 }
5777
5778 let scalar_args = match &args {
5779 FunctionArgs::Star => {
5780 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5781 }
5782 FunctionArgs::Args { args, order_by } => {
5783 if !order_by.is_empty() {
5784 sql_bail!(
5785 "ORDER BY specified, but {} is not an aggregate function",
5786 name
5787 );
5788 }
5789 plan_exprs(ecx, args)?
5790 }
5791 };
5792
5793 Ok((
5794 ignore_nulls,
5795 order_by_exprs,
5796 col_orders,
5797 window_frame,
5798 partition,
5799 scalar_args,
5800 ))
5801}
5802
5803fn plan_window_function_common(
5805 ecx: &ExprContext,
5806 name: &<Aug as AstInfo>::ItemName,
5807 over: &Option<WindowSpec<Aug>>,
5808) -> Result<
5809 (
5810 bool,
5811 Vec<HirScalarExpr>,
5812 Vec<ColumnOrder>,
5813 mz_expr::WindowFrame,
5814 Vec<HirScalarExpr>,
5815 ),
5816 PlanError,
5817> {
5818 if !ecx.allow_windows {
5819 sql_bail!(
5820 "window functions are not allowed in {} (function {})",
5821 ecx.name,
5822 name
5823 );
5824 }
5825
5826 let window_spec = match over.as_ref() {
5827 Some(over) => over,
5828 None => sql_bail!("window function {} requires an OVER clause", name),
5829 };
5830 if window_spec.ignore_nulls && window_spec.respect_nulls {
5831 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5832 }
5833 let window_frame = match window_spec.window_frame.as_ref() {
5834 Some(frame) => plan_window_frame(frame)?,
5835 None => mz_expr::WindowFrame::default(),
5836 };
5837 let mut partition = Vec::new();
5838 for expr in &window_spec.partition_by {
5839 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5840 }
5841
5842 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5843
5844 Ok((
5845 window_spec.ignore_nulls,
5846 order_by_exprs,
5847 col_orders,
5848 window_frame,
5849 partition,
5850 ))
5851}
5852
5853fn plan_window_frame(
5854 WindowFrame {
5855 units,
5856 start_bound,
5857 end_bound,
5858 }: &WindowFrame,
5859) -> Result<mz_expr::WindowFrame, PlanError> {
5860 use mz_expr::WindowFrameBound::*;
5861 let units = window_frame_unit_ast_to_expr(units)?;
5862 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5863 let end_bound = end_bound
5864 .as_ref()
5865 .map(window_frame_bound_ast_to_expr)
5866 .unwrap_or(CurrentRow);
5867
5868 match (&start_bound, &end_bound) {
5870 (UnboundedFollowing, _) => {
5872 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5873 }
5874 (_, UnboundedPreceding) => {
5876 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5877 }
5878 (CurrentRow, OffsetPreceding(_)) => {
5880 sql_bail!("frame starting from current row cannot have preceding rows")
5881 }
5882 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5883 sql_bail!("frame starting from following row cannot have preceding rows")
5884 }
5885 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5888 if *o1 > 1000000 || *o2 > 1000000 {
5892 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5893 }
5894 }
5895 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5896 if *o1 > 1000000 || *o2 > 1000000 {
5897 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5898 }
5899 }
5900 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5901 if *o1 > 1000000 || *o2 > 1000000 {
5902 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5903 }
5904 }
5905 (OffsetPreceding(o), CurrentRow) => {
5906 if *o > 1000000 {
5907 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5908 }
5909 }
5910 (CurrentRow, OffsetFollowing(o)) => {
5911 if *o > 1000000 {
5912 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5913 }
5914 }
5915 (_, _) => (),
5917 }
5918
5919 if units == mz_expr::WindowFrameUnits::Range
5922 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5923 {
5924 bail_unsupported!("RANGE in non-default window frames")
5925 }
5926
5927 let frame = mz_expr::WindowFrame {
5928 units,
5929 start_bound,
5930 end_bound,
5931 };
5932 Ok(frame)
5933}
5934
5935fn window_frame_unit_ast_to_expr(
5936 unit: &WindowFrameUnits,
5937) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5938 match unit {
5939 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5940 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5941 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5942 }
5943}
5944
5945fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5946 match bound {
5947 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5948 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5949 WindowFrameBound::Preceding(Some(offset)) => {
5950 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5951 }
5952 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5953 WindowFrameBound::Following(Some(offset)) => {
5954 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5955 }
5956 }
5957}
5958
5959pub fn scalar_type_from_sql(
5960 scx: &StatementContext,
5961 data_type: &ResolvedDataType,
5962) -> Result<SqlScalarType, PlanError> {
5963 match data_type {
5964 ResolvedDataType::AnonymousList(elem_type) => {
5965 let elem_type = scalar_type_from_sql(scx, elem_type)?;
5966 if matches!(elem_type, SqlScalarType::Char { .. }) {
5967 bail_unsupported!("char list");
5968 }
5969 Ok(SqlScalarType::List {
5970 element_type: Box::new(elem_type),
5971 custom_id: None,
5972 })
5973 }
5974 ResolvedDataType::AnonymousMap {
5975 key_type,
5976 value_type,
5977 } => {
5978 match scalar_type_from_sql(scx, key_type)? {
5979 SqlScalarType::String => {}
5980 other => sql_bail!(
5981 "map key type must be {}, got {}",
5982 scx.humanize_scalar_type(&SqlScalarType::String, false),
5983 scx.humanize_scalar_type(&other, false)
5984 ),
5985 }
5986 Ok(SqlScalarType::Map {
5987 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5988 custom_id: None,
5989 })
5990 }
5991 ResolvedDataType::Named { id, modifiers, .. } => {
5992 scalar_type_from_catalog(scx.catalog, *id, modifiers)
5993 }
5994 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5995 }
5996}
5997
5998pub fn scalar_type_from_catalog(
5999 catalog: &dyn SessionCatalog,
6000 id: CatalogItemId,
6001 modifiers: &[i64],
6002) -> Result<SqlScalarType, PlanError> {
6003 let entry = catalog.get_item(&id);
6004 let type_details = match entry.type_details() {
6005 Some(type_details) => type_details,
6006 None => {
6007 sql_bail!(
6010 "internal error: {} does not refer to a type",
6011 catalog.resolve_full_name(entry.name()).to_string().quoted()
6012 );
6013 }
6014 };
6015 match &type_details.typ {
6016 CatalogType::Numeric => {
6017 let mut modifiers = modifiers.iter().fuse();
6018 let precision = match modifiers.next() {
6019 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6020 sql_bail!(
6021 "precision for type numeric must be between 1 and {}",
6022 NUMERIC_DATUM_MAX_PRECISION,
6023 );
6024 }
6025 Some(p) => Some(*p),
6026 None => None,
6027 };
6028 let scale = match modifiers.next() {
6029 Some(scale) => {
6030 if let Some(precision) = precision {
6031 if *scale > precision {
6032 sql_bail!(
6033 "scale for type numeric must be between 0 and precision {}",
6034 precision
6035 );
6036 }
6037 }
6038 Some(NumericMaxScale::try_from(*scale)?)
6039 }
6040 None => None,
6041 };
6042 if modifiers.next().is_some() {
6043 sql_bail!("type numeric supports at most two type modifiers");
6044 }
6045 Ok(SqlScalarType::Numeric { max_scale: scale })
6046 }
6047 CatalogType::Char => {
6048 let mut modifiers = modifiers.iter().fuse();
6049 let length = match modifiers.next() {
6050 Some(l) => Some(CharLength::try_from(*l)?),
6051 None => Some(CharLength::ONE),
6052 };
6053 if modifiers.next().is_some() {
6054 sql_bail!("type character supports at most one type modifier");
6055 }
6056 Ok(SqlScalarType::Char { length })
6057 }
6058 CatalogType::VarChar => {
6059 let mut modifiers = modifiers.iter().fuse();
6060 let length = match modifiers.next() {
6061 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6062 None => None,
6063 };
6064 if modifiers.next().is_some() {
6065 sql_bail!("type character varying supports at most one type modifier");
6066 }
6067 Ok(SqlScalarType::VarChar { max_length: length })
6068 }
6069 CatalogType::Timestamp => {
6070 let mut modifiers = modifiers.iter().fuse();
6071 let precision = match modifiers.next() {
6072 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6073 None => None,
6074 };
6075 if modifiers.next().is_some() {
6076 sql_bail!("type timestamp supports at most one type modifier");
6077 }
6078 Ok(SqlScalarType::Timestamp { precision })
6079 }
6080 CatalogType::TimestampTz => {
6081 let mut modifiers = modifiers.iter().fuse();
6082 let precision = match modifiers.next() {
6083 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6084 None => None,
6085 };
6086 if modifiers.next().is_some() {
6087 sql_bail!("type timestamp with time zone supports at most one type modifier");
6088 }
6089 Ok(SqlScalarType::TimestampTz { precision })
6090 }
6091 t => {
6092 if !modifiers.is_empty() {
6093 sql_bail!(
6094 "{} does not support type modifiers",
6095 catalog.resolve_full_name(entry.name()).to_string()
6096 );
6097 }
6098 match t {
6099 CatalogType::Array {
6100 element_reference: element_id,
6101 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6102 catalog,
6103 *element_id,
6104 modifiers,
6105 )?))),
6106 CatalogType::List {
6107 element_reference: element_id,
6108 element_modifiers,
6109 } => Ok(SqlScalarType::List {
6110 element_type: Box::new(scalar_type_from_catalog(
6111 catalog,
6112 *element_id,
6113 element_modifiers,
6114 )?),
6115 custom_id: Some(id),
6116 }),
6117 CatalogType::Map {
6118 key_reference: _,
6119 key_modifiers: _,
6120 value_reference: value_id,
6121 value_modifiers,
6122 } => Ok(SqlScalarType::Map {
6123 value_type: Box::new(scalar_type_from_catalog(
6124 catalog,
6125 *value_id,
6126 value_modifiers,
6127 )?),
6128 custom_id: Some(id),
6129 }),
6130 CatalogType::Range {
6131 element_reference: element_id,
6132 } => Ok(SqlScalarType::Range {
6133 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6134 }),
6135 CatalogType::Record { fields } => {
6136 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6137 .iter()
6138 .map(|f| {
6139 let scalar_type = scalar_type_from_catalog(
6140 catalog,
6141 f.type_reference,
6142 &f.type_modifiers,
6143 )?;
6144 Ok((
6145 f.name.clone(),
6146 SqlColumnType {
6147 scalar_type,
6148 nullable: true,
6149 },
6150 ))
6151 })
6152 .collect::<Result<Box<_>, PlanError>>()?;
6153 Ok(SqlScalarType::Record {
6154 fields: scalars,
6155 custom_id: Some(id),
6156 })
6157 }
6158 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6159 CatalogType::Bool => Ok(SqlScalarType::Bool),
6160 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6161 CatalogType::Date => Ok(SqlScalarType::Date),
6162 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6163 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6164 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6165 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6166 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6167 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6168 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6169 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6170 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6171 CatalogType::Interval => Ok(SqlScalarType::Interval),
6172 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6173 CatalogType::Oid => Ok(SqlScalarType::Oid),
6174 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6175 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6176 CatalogType::Pseudo => {
6177 sql_bail!(
6178 "cannot reference pseudo type {}",
6179 catalog.resolve_full_name(entry.name()).to_string()
6180 )
6181 }
6182 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6183 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6184 CatalogType::RegType => Ok(SqlScalarType::RegType),
6185 CatalogType::String => Ok(SqlScalarType::String),
6186 CatalogType::Time => Ok(SqlScalarType::Time),
6187 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6188 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6189 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6190 CatalogType::Numeric => unreachable!("handled above"),
6191 CatalogType::Char => unreachable!("handled above"),
6192 CatalogType::VarChar => unreachable!("handled above"),
6193 CatalogType::Timestamp => unreachable!("handled above"),
6194 CatalogType::TimestampTz => unreachable!("handled above"),
6195 }
6196 }
6197 }
6198}
6199
6200struct AggregateTableFuncVisitor<'a> {
6203 scx: &'a StatementContext<'a>,
6204 aggs: Vec<Function<Aug>>,
6205 within_aggregate: bool,
6206 tables: BTreeMap<Function<Aug>, String>,
6207 table_disallowed_context: Vec<&'static str>,
6208 in_select_item: bool,
6209 id_gen: IdGen,
6210 err: Option<PlanError>,
6211}
6212
6213impl<'a> AggregateTableFuncVisitor<'a> {
6214 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6215 AggregateTableFuncVisitor {
6216 scx,
6217 aggs: Vec::new(),
6218 within_aggregate: false,
6219 tables: BTreeMap::new(),
6220 table_disallowed_context: Vec::new(),
6221 in_select_item: false,
6222 id_gen: Default::default(),
6223 err: None,
6224 }
6225 }
6226
6227 fn into_result(
6228 self,
6229 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6230 match self.err {
6231 Some(err) => Err(err),
6232 None => {
6233 let mut seen = BTreeSet::new();
6236 let aggs = self
6237 .aggs
6238 .into_iter()
6239 .filter(move |agg| seen.insert(agg.clone()))
6240 .collect();
6241 Ok((aggs, self.tables))
6242 }
6243 }
6244 }
6245}
6246
6247impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6248 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6249 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6250 Ok(i) => i,
6251 Err(_) => return,
6253 };
6254
6255 match item.func() {
6256 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6259 if self.within_aggregate {
6260 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6261 return;
6262 }
6263 self.aggs.push(func.clone());
6264 let Function {
6265 name: _,
6266 args,
6267 filter,
6268 over: _,
6269 distinct: _,
6270 } = func;
6271 if let Some(filter) = filter {
6272 self.visit_expr_mut(filter);
6273 }
6274 let old_within_aggregate = self.within_aggregate;
6275 self.within_aggregate = true;
6276 self.table_disallowed_context
6277 .push("aggregate function calls");
6278
6279 self.visit_function_args_mut(args);
6280
6281 self.within_aggregate = old_within_aggregate;
6282 self.table_disallowed_context.pop();
6283 }
6284 Ok(Func::Table { .. }) => {
6285 self.table_disallowed_context.push("other table functions");
6286 visit_mut::visit_function_mut(self, func);
6287 self.table_disallowed_context.pop();
6288 }
6289 _ => visit_mut::visit_function_mut(self, func),
6290 }
6291 }
6292
6293 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6294 }
6296
6297 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6298 let (disallowed_context, func) = match expr {
6299 Expr::Case { .. } => (Some("CASE"), None),
6300 Expr::HomogenizingFunction {
6301 function: HomogenizingFunction::Coalesce,
6302 ..
6303 } => (Some("COALESCE"), None),
6304 Expr::Function(func) if self.in_select_item => {
6305 let mut table_func = None;
6308 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6309 if let Ok(Func::Table { .. }) = item.func() {
6310 if let Some(context) = self.table_disallowed_context.last() {
6311 self.err = Some(sql_err!(
6312 "table functions are not allowed in {} (function {})",
6313 context,
6314 func.name
6315 ));
6316 return;
6317 }
6318 table_func = Some(func.clone());
6319 }
6320 }
6321 (None, table_func)
6324 }
6325 _ => (None, None),
6326 };
6327 if let Some(func) = func {
6328 visit_mut::visit_expr_mut(self, expr);
6330 if let Function {
6332 name: _,
6333 args: _,
6334 filter: None,
6335 over: None,
6336 distinct: false,
6337 } = &func
6338 {
6339 let unique_id = self.id_gen.allocate_id();
6341 let id = self
6342 .tables
6343 .entry(func)
6344 .or_insert_with(|| format!("table_func_{unique_id}"));
6345 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6348 }
6349 }
6350 if let Some(context) = disallowed_context {
6351 self.table_disallowed_context.push(context);
6352 }
6353
6354 visit_mut::visit_expr_mut(self, expr);
6355
6356 if disallowed_context.is_some() {
6357 self.table_disallowed_context.pop();
6358 }
6359 }
6360
6361 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6362 let old = self.in_select_item;
6363 self.in_select_item = true;
6364 visit_mut::visit_select_item_mut(self, si);
6365 self.in_select_item = old;
6366 }
6367}
6368
6369#[derive(Default)]
6370struct WindowFuncCollector {
6371 window_funcs: Vec<Expr<Aug>>,
6372}
6373
6374impl WindowFuncCollector {
6375 fn into_result(self) -> Vec<Expr<Aug>> {
6376 let mut seen = BTreeSet::new();
6378 let window_funcs_dedupped = self
6379 .window_funcs
6380 .into_iter()
6381 .filter(move |expr| seen.insert(expr.clone()))
6382 .rev()
6385 .collect();
6386 window_funcs_dedupped
6387 }
6388}
6389
6390impl Visit<'_, Aug> for WindowFuncCollector {
6391 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6392 match expr {
6393 Expr::Function(func) => {
6394 if func.over.is_some() {
6395 self.window_funcs.push(expr.clone());
6396 }
6397 }
6398 _ => (),
6399 }
6400 visit::visit_expr(self, expr);
6401 }
6402
6403 fn visit_query(&mut self, _query: &Query<Aug>) {
6404 }
6406}
6407
6408#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6410pub enum QueryLifetime {
6411 OneShot,
6413 Index,
6415 MaterializedView,
6417 Subscribe,
6419 View,
6421 Source,
6423}
6424
6425impl QueryLifetime {
6426 pub fn is_one_shot(&self) -> bool {
6430 let result = match self {
6431 QueryLifetime::OneShot => true,
6432 QueryLifetime::Index => false,
6433 QueryLifetime::MaterializedView => false,
6434 QueryLifetime::Subscribe => false,
6435 QueryLifetime::View => false,
6436 QueryLifetime::Source => false,
6437 };
6438 assert_eq!(!result, self.is_maintained());
6439 result
6440 }
6441
6442 pub fn is_maintained(&self) -> bool {
6445 match self {
6446 QueryLifetime::OneShot => false,
6447 QueryLifetime::Index => true,
6448 QueryLifetime::MaterializedView => true,
6449 QueryLifetime::Subscribe => true,
6450 QueryLifetime::View => true,
6451 QueryLifetime::Source => true,
6452 }
6453 }
6454
6455 pub fn allow_show(&self) -> bool {
6457 match self {
6458 QueryLifetime::OneShot => true,
6459 QueryLifetime::Index => false,
6460 QueryLifetime::MaterializedView => false,
6461 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6463 QueryLifetime::Source => false,
6464 }
6465 }
6466}
6467
6468#[derive(Debug, Clone)]
6470pub struct CteDesc {
6471 pub name: String,
6472 pub desc: RelationDesc,
6473}
6474
6475#[derive(Debug, Clone)]
6477pub struct QueryContext<'a> {
6478 pub scx: &'a StatementContext<'a>,
6480 pub lifetime: QueryLifetime,
6482 pub outer_scopes: Vec<Scope>,
6484 pub outer_relation_types: Vec<SqlRelationType>,
6486 pub ctes: BTreeMap<LocalId, CteDesc>,
6488 pub name_manager: Rc<RefCell<NameManager>>,
6490 pub recursion_guard: RecursionGuard,
6491}
6492
6493impl CheckedRecursion for QueryContext<'_> {
6494 fn recursion_guard(&self) -> &RecursionGuard {
6495 &self.recursion_guard
6496 }
6497}
6498
6499impl<'a> QueryContext<'a> {
6500 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6501 QueryContext {
6502 scx,
6503 lifetime,
6504 outer_scopes: vec![],
6505 outer_relation_types: vec![],
6506 ctes: BTreeMap::new(),
6507 name_manager: Rc::new(RefCell::new(NameManager::new())),
6508 recursion_guard: RecursionGuard::with_limit(1024), }
6510 }
6511
6512 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6513 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6514 }
6515
6516 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6519 let ctes = self.ctes.clone();
6520 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6521 let outer_relation_types = iter::once(relation_type)
6522 .chain(self.outer_relation_types.clone())
6523 .collect();
6524 let name_manager = Rc::clone(&self.name_manager);
6526
6527 QueryContext {
6528 scx: self.scx,
6529 lifetime: self.lifetime,
6530 outer_scopes,
6531 outer_relation_types,
6532 ctes,
6533 name_manager,
6534 recursion_guard: self.recursion_guard.clone(),
6535 }
6536 }
6537
6538 fn empty_derived_context(&self) -> QueryContext<'a> {
6540 let scope = Scope::empty();
6541 let ty = SqlRelationType::empty();
6542 self.derived_context(scope, ty)
6543 }
6544
6545 pub fn resolve_table_name(
6548 &self,
6549 object: ResolvedItemName,
6550 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6551 match object {
6552 ResolvedItemName::Item {
6553 id,
6554 full_name,
6555 version,
6556 ..
6557 } => {
6558 let name = full_name.into();
6559 let item = self.scx.get_item(&id).at_version(version);
6560 let desc = item
6561 .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6562 .clone();
6563 let expr = HirRelationExpr::Get {
6564 id: Id::Global(item.global_id()),
6565 typ: desc.typ().clone(),
6566 };
6567
6568 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6569
6570 Ok((expr, scope))
6571 }
6572 ResolvedItemName::Cte { id, name } => {
6573 let name = name.into();
6574 let cte = self.ctes.get(&id).unwrap();
6575 let expr = HirRelationExpr::Get {
6576 id: Id::Local(id),
6577 typ: cte.desc.typ().clone(),
6578 };
6579
6580 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6581
6582 Ok((expr, scope))
6583 }
6584 ResolvedItemName::ContinualTask { id, name } => {
6585 let cte = self.ctes.get(&id).unwrap();
6586 let expr = HirRelationExpr::Get {
6587 id: Id::Local(id),
6588 typ: cte.desc.typ().clone(),
6589 };
6590
6591 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6592
6593 Ok((expr, scope))
6594 }
6595 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6596 }
6597 }
6598
6599 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6602 self.scx.humanize_scalar_type(typ, postgres_compat)
6603 }
6604}
6605
6606#[derive(Debug, Clone)]
6608pub struct ExprContext<'a> {
6609 pub qcx: &'a QueryContext<'a>,
6610 pub name: &'a str,
6612 pub scope: &'a Scope,
6615 pub relation_type: &'a SqlRelationType,
6618 pub allow_aggregates: bool,
6620 pub allow_subqueries: bool,
6622 pub allow_parameters: bool,
6624 pub allow_windows: bool,
6626}
6627
6628impl CheckedRecursion for ExprContext<'_> {
6629 fn recursion_guard(&self) -> &RecursionGuard {
6630 &self.qcx.recursion_guard
6631 }
6632}
6633
6634impl<'a> ExprContext<'a> {
6635 pub fn catalog(&self) -> &dyn SessionCatalog {
6636 self.qcx.scx.catalog
6637 }
6638
6639 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6640 let mut ecx = self.clone();
6641 ecx.name = name;
6642 ecx
6643 }
6644
6645 pub fn column_type<E>(&self, expr: &E) -> E::Type
6646 where
6647 E: AbstractExpr,
6648 {
6649 expr.typ(
6650 &self.qcx.outer_relation_types,
6651 self.relation_type,
6652 &self.qcx.scx.param_types.borrow(),
6653 )
6654 }
6655
6656 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6657 where
6658 E: AbstractExpr,
6659 {
6660 self.column_type(expr).scalar_type()
6661 }
6662
6663 fn derived_query_context(&self) -> QueryContext<'_> {
6664 let mut scope = self.scope.clone();
6665 scope.lateral_barrier = true;
6666 self.qcx.derived_context(scope, self.relation_type.clone())
6667 }
6668
6669 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6670 self.qcx.scx.require_feature_flag(flag)
6671 }
6672
6673 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6674 &self.qcx.scx.param_types
6675 }
6676
6677 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6680 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6681 }
6682
6683 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6684 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6685 }
6686}
6687
6688#[derive(Debug, Clone)]
6694pub struct NameManager(BTreeSet<Arc<str>>);
6695
6696impl NameManager {
6697 pub fn new() -> Self {
6699 Self(BTreeSet::new())
6700 }
6701
6702 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6705 let s = s.as_ref();
6706 if let Some(interned) = self.0.get(s) {
6707 Arc::clone(interned)
6708 } else {
6709 let interned: Arc<str> = Arc::from(s);
6710 self.0.insert(Arc::clone(&interned));
6711 interned
6712 }
6713 }
6714
6715 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6718 self.intern(item.column_name.as_str())
6734 }
6735}
6736
6737#[cfg(test)]
6738mod test {
6739 use super::*;
6740
6741 #[mz_ore::test]
6746 pub fn test_name_manager_string_interning() {
6747 let mut nm = NameManager::new();
6748
6749 let orig_hi = "hi";
6750 let hi = nm.intern(orig_hi);
6751 let hello = nm.intern("hello");
6752
6753 assert_ne!(hi.as_ptr(), hello.as_ptr());
6754
6755 let hi2 = nm.intern("hi");
6757 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6758
6759 let s = format!(
6761 "{}{}",
6762 hi.chars().nth(0).unwrap(),
6763 hi2.chars().nth(1).unwrap()
6764 );
6765 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6767
6768 let hi3 = nm.intern(s);
6769 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6770 }
6771}