1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52 func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67 RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103 literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111 pub expr: E,
112 pub desc: RelationDesc,
113 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114 pub scope: Scope,
115}
116
117#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127 scx: &StatementContext,
128 mut query: Query<Aug>,
129 lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131 transform_ast::transform(scx, &mut query)?;
132 let mut qcx = QueryContext::root(scx, lifetime);
133 let PlannedQuery {
134 mut expr,
135 scope,
136 order_by,
137 limit,
138 offset,
139 project,
140 group_size_hints,
141 } = plan_query(&mut qcx, &query)?;
142
143 let mut finishing = RowSetFinishing {
144 limit,
145 offset,
146 project,
147 order_by,
148 };
149
150 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157 if lifetime.is_maintained() {
158 expr.finish_maintained(&mut finishing, group_size_hints);
159 }
160
161 let typ = qcx.relation_type(&expr);
162 let typ = SqlRelationType::new(
163 finishing
164 .project
165 .iter()
166 .map(|i| typ.column_types[*i].clone())
167 .collect(),
168 );
169 let desc = RelationDesc::new(typ, scope.column_names());
170
171 Ok(PlannedRootQuery {
172 expr,
173 desc,
174 finishing,
175 scope,
176 })
177}
178
179#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182 qcx: &mut QueryContext,
183 mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185 transform_ast::transform(qcx.scx, &mut query)?;
186 let PlannedQuery {
187 mut expr,
188 scope,
189 order_by,
190 limit,
191 offset,
192 project,
193 group_size_hints,
194 } = plan_query(qcx, &query)?;
195
196 let mut finishing = RowSetFinishing {
197 limit,
198 offset,
199 project,
200 order_by,
201 };
202
203 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210 expr.finish_maintained(&mut finishing, group_size_hints);
211
212 let typ = qcx.relation_type(&expr);
213 let typ = SqlRelationType::new(
214 finishing
215 .project
216 .iter()
217 .map(|i| typ.column_types[*i].clone())
218 .collect(),
219 );
220 let desc = RelationDesc::new(typ, scope.column_names());
221
222 Ok(PlannedRootQuery {
223 expr,
224 desc,
225 finishing,
226 scope,
227 })
228}
229
230fn try_push_projection_order_by(
241 expr: &mut HirRelationExpr,
242 project: &mut Vec<usize>,
243 order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245 let mut unproject = vec![None; expr.arity()];
246 for (out_i, in_i) in project.iter().copied().enumerate() {
247 unproject[in_i] = Some(out_i);
248 }
249 if order_by
250 .iter()
251 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252 {
253 let trivial_project = (0..project.len()).collect();
254 *expr = expr.take().project(mem::replace(project, trivial_project));
255 for ob in order_by {
256 ob.column = unproject[ob.column].unwrap();
257 }
258 true
259 } else {
260 false
261 }
262}
263
264pub fn plan_insert_query(
265 scx: &StatementContext,
266 table_name: ResolvedItemName,
267 columns: Vec<Ident>,
268 source: InsertSource<Aug>,
269 returning: Vec<SelectItem<Aug>>,
270) -> Result<
271 (
272 CatalogItemId,
273 HirRelationExpr,
274 PlannedRootQuery<Vec<HirScalarExpr>>,
275 ),
276 PlanError,
277> {
278 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279 let table = scx.get_item_by_resolved_name(&table_name)?;
280
281 if table.item_type() != CatalogItemType::Table {
283 sql_bail!(
284 "cannot insert into {} '{}'",
285 table.item_type(),
286 table_name.full_name_str()
287 );
288 }
289 let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290 let mut defaults = table
291 .writable_table_details()
292 .ok_or_else(|| {
293 sql_err!(
294 "cannot insert into non-writeable table '{}'",
295 table_name.full_name_str()
296 )
297 })?
298 .to_vec();
299
300 for default in &mut defaults {
301 transform_ast::transform(scx, default)?;
302 }
303
304 if table.id().is_system() {
305 sql_bail!(
306 "cannot insert into system table '{}'",
307 table_name.full_name_str()
308 );
309 }
310
311 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313 let mut source_types = Vec::with_capacity(columns.len());
315 let mut ordering = Vec::with_capacity(columns.len());
316
317 if columns.is_empty() {
318 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321 ordering.extend(0..desc.arity());
322 } else {
323 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324 .iter()
325 .enumerate()
326 .map(|(idx, (name, typ))| (name, (idx, typ)))
327 .collect();
328
329 for c in &columns {
330 if let Some((idx, typ)) = column_by_name.get(c) {
331 ordering.push(*idx);
332 source_types.push(&typ.scalar_type);
333 } else {
334 sql_bail!(
335 "column {} of relation {} does not exist",
336 c.quoted(),
337 table_name.full_name_str().quoted()
338 );
339 }
340 }
341 if let Some(dup) = columns.iter().duplicates().next() {
342 sql_bail!("column {} specified more than once", dup.quoted());
343 }
344 };
345
346 let expr = match source {
348 InsertSource::Query(mut query) => {
349 transform_ast::transform(scx, &mut query)?;
350
351 match query {
352 Query {
354 body: SetExpr::Values(Values(values)),
355 ctes,
356 order_by,
357 limit: None,
358 offset: None,
359 } if ctes.is_empty() && order_by.is_empty() => {
360 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361 plan_values_insert(&qcx, &names, &source_types, &values)?
362 }
363 _ => {
364 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365 expr
366 }
367 }
368 }
369 InsertSource::DefaultValues => {
370 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371 }
372 };
373
374 let expr_arity = expr.arity();
375
376 let max_columns = if columns.is_empty() {
379 desc.arity()
380 } else {
381 columns.len()
382 };
383 if expr_arity > max_columns {
384 sql_bail!("INSERT has more expressions than target columns");
385 }
386 if expr_arity < columns.len() {
388 sql_bail!("INSERT has more target columns than expressions");
389 }
390
391 source_types.truncate(expr_arity);
393 ordering.truncate(expr_arity);
394
395 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398 sql_err!(
399 "column {} is of type {} but expression is of type {}",
400 desc.get_name(ordering[e.column]).quoted(),
401 qcx.humanize_scalar_type(&e.target_type, false),
402 qcx.humanize_scalar_type(&e.source_type, false),
403 )
404 })?;
405
406 let mut map_exprs = vec![];
408 let mut project_key = Vec::with_capacity(desc.arity());
409
410 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414 for (col_idx, (col_typ, default)) in column_details {
415 if let Some(src_idx) = col_to_source.get(&col_idx) {
416 project_key.push(*src_idx);
417 } else {
418 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419 project_key.push(expr_arity + map_exprs.len());
420 map_exprs.push(hir);
421 }
422 }
423
424 let returning = {
425 let (scope, typ) = if let ResolvedItemName::Item {
426 full_name,
427 version: _,
428 ..
429 } = table_name
430 {
431 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432 let typ = desc.typ().clone();
433 (scope, typ)
434 } else {
435 (Scope::empty(), SqlRelationType::empty())
436 };
437 let ecx = &ExprContext {
438 qcx: &qcx,
439 name: "RETURNING clause",
440 scope: &scope,
441 relation_type: &typ,
442 allow_aggregates: false,
443 allow_subqueries: false,
444 allow_parameters: true,
445 allow_windows: false,
446 };
447 let table_func_names = BTreeMap::new();
448 let mut output_columns = vec![];
449 let mut new_exprs = vec![];
450 let mut new_type = SqlRelationType::empty();
451 for mut si in returning {
452 transform_ast::transform(scx, &mut si)?;
453 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454 let expr = match &select_item {
455 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457 };
458 output_columns.push(column_name);
459 let typ = ecx.column_type(&expr);
460 new_type.column_types.push(typ);
461 new_exprs.push(expr);
462 }
463 }
464 let desc = RelationDesc::new(new_type, output_columns);
465 let desc_arity = desc.arity();
466 PlannedRootQuery {
467 expr: new_exprs,
468 desc,
469 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = item.desc(&fullname)?.into_owned();
507 let mut ordering = Vec::with_capacity(columns.len());
508
509 let mfp = if let Some(table_defaults) = item.writable_table_details() {
519 let mut table_defaults = table_defaults.to_vec();
520
521 for default in &mut table_defaults {
522 transform_ast::transform(scx, default)?;
523 }
524
525 let source_column_names: Vec<_> = columns
527 .iter()
528 .cloned()
529 .map(normalize::column_name)
530 .collect();
531
532 let mut default_exprs = Vec::new();
533 let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535 let column_details = table_desc.iter().zip_eq(table_defaults);
538 for ((col_name, col_type), col_default) in column_details {
539 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540 if let Some(src_idx) = maybe_src_idx {
541 project_keys.push(src_idx);
542 } else {
543 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546 let mir = hir.lower_uncorrelated()?;
547 project_keys.push(source_column_names.len() + default_exprs.len());
548 default_exprs.push(mir);
549 }
550 }
551
552 let mfp = MapFilterProject::new(source_column_names.len())
553 .map(default_exprs)
554 .project(project_keys);
555 Some(mfp)
556 } else {
557 None
558 };
559
560 let source_desc = if columns.is_empty() {
562 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563 ordering.extend(indexes);
564
565 table_desc
567 } else {
568 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570 .iter_all()
571 .map(|(idx, name, typ)| (name, (*idx, typ)))
572 .collect();
573
574 let mut names = Vec::with_capacity(columns.len());
575 let mut source_types = Vec::with_capacity(columns.len());
576
577 for c in &columns {
578 if let Some((idx, typ)) = column_by_name.get(c) {
579 ordering.push(*idx);
580 source_types.push((*typ).clone());
581 names.push(c.clone());
582 } else {
583 sql_bail!(
584 "column {} of relation {} does not exist",
585 c.quoted(),
586 item_name.full_name_str().quoted()
587 );
588 }
589 }
590 if let Some(dup) = columns.iter().duplicates().next() {
591 sql_bail!("column {} specified more than once", dup.quoted());
592 }
593
594 RelationDesc::new(SqlRelationType::new(source_types), names)
596 };
597
598 Ok((item.id(), source_desc, ordering, mfp))
599}
600
601pub fn plan_copy_from(
605 scx: &StatementContext,
606 table_name: ResolvedItemName,
607 columns: Vec<Ident>,
608) -> Result<
609 (
610 CatalogItemId,
611 RelationDesc,
612 Vec<ColumnIndex>,
613 Option<MapFilterProject>,
614 ),
615 PlanError,
616> {
617 let table = scx.get_item_by_resolved_name(&table_name)?;
618
619 if table.item_type() != CatalogItemType::Table {
621 sql_bail!(
622 "cannot insert into {} '{}'",
623 table.item_type(),
624 table_name.full_name_str()
625 );
626 }
627
628 let _ = table.writable_table_details().ok_or_else(|| {
629 sql_err!(
630 "cannot insert into non-writeable table '{}'",
631 table_name.full_name_str()
632 )
633 })?;
634
635 if table.id().is_system() {
636 sql_bail!(
637 "cannot insert into system table '{}'",
638 table_name.full_name_str()
639 );
640 }
641 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643 Ok((id, desc, ordering, mfp))
644}
645
646pub fn plan_copy_from_rows(
649 pcx: &PlanContext,
650 catalog: &dyn SessionCatalog,
651 target_id: CatalogItemId,
652 target_name: String,
653 columns: Vec<ColumnIndex>,
654 rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656 let scx = StatementContext::new(Some(pcx), catalog);
657
658 let table = catalog
660 .try_get_item(&target_id)
661 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662 .at_version(RelationVersionSelector::Latest);
663 let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665 let mut defaults = table
666 .writable_table_details()
667 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668 .to_vec();
669
670 for default in &mut defaults {
671 transform_ast::transform(&scx, default)?;
672 }
673
674 let column_types = columns
675 .iter()
676 .map(|x| desc.get_type(x).clone())
677 .map(|mut x| {
678 x.nullable = true;
681 x
682 })
683 .collect();
684 let typ = SqlRelationType::new(column_types);
685 let expr = HirRelationExpr::Constant {
686 rows,
687 typ: typ.clone(),
688 };
689
690 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696 if columns == default {
697 return Ok(expr);
698 }
699
700 let mut map_exprs = vec![];
702 let mut project_key = Vec::with_capacity(desc.arity());
703
704 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707 let column_details = desc.iter_all().zip_eq(defaults);
708 for ((col_idx, _col_name, col_typ), default) in column_details {
709 if let Some(src_idx) = col_to_source.get(&col_idx) {
710 project_key.push(*src_idx);
711 } else {
712 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713 project_key.push(typ.arity() + map_exprs.len());
714 map_exprs.push(hir);
715 }
716 }
717
718 Ok(expr.map(map_exprs).project(project_key))
719}
720
721pub struct ReadThenWritePlan {
723 pub id: CatalogItemId,
724 pub selection: HirRelationExpr,
729 pub assignments: BTreeMap<usize, HirScalarExpr>,
731 pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735 scx: &StatementContext,
736 mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738 transform_ast::transform(scx, &mut delete_stmt)?;
739
740 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741 plan_mutation_query_inner(
742 qcx,
743 delete_stmt.table_name,
744 delete_stmt.alias,
745 delete_stmt.using,
746 vec![],
747 delete_stmt.selection,
748 )
749}
750
751pub fn plan_update_query(
752 scx: &StatementContext,
753 mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755 transform_ast::transform(scx, &mut update_stmt)?;
756
757 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759 plan_mutation_query_inner(
760 qcx,
761 update_stmt.table_name,
762 update_stmt.alias,
763 vec![],
764 update_stmt.assignments,
765 update_stmt.selection,
766 )
767}
768
769pub fn plan_mutation_query_inner(
770 qcx: QueryContext,
771 table_name: ResolvedItemName,
772 alias: Option<TableAlias>,
773 using: Vec<TableWithJoins<Aug>>,
774 assignments: Vec<Assignment<Aug>>,
775 selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777 let (id, version) = match table_name {
779 ResolvedItemName::Item { id, version, .. } => (id, version),
780 _ => sql_bail!("cannot mutate non-user table"),
781 };
782
783 let item = qcx.scx.get_item(&id).at_version(version);
785 if item.item_type() != CatalogItemType::Table {
786 sql_bail!(
787 "cannot mutate {} '{}'",
788 item.item_type(),
789 table_name.full_name_str()
790 );
791 }
792 let _ = item.writable_table_details().ok_or_else(|| {
793 sql_err!(
794 "cannot mutate non-writeable table '{}'",
795 table_name.full_name_str()
796 )
797 })?;
798 if id.is_system() {
799 sql_bail!(
800 "cannot mutate system table '{}'",
801 table_name.full_name_str()
802 );
803 }
804
805 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807 let scope = plan_table_alias(scope, alias.as_ref())?;
808 let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809 let relation_type = qcx.relation_type(&get);
810
811 if using.is_empty() {
812 if let Some(expr) = selection {
813 let ecx = &ExprContext {
814 qcx: &qcx,
815 name: "WHERE clause",
816 scope: &scope,
817 relation_type: &relation_type,
818 allow_aggregates: false,
819 allow_subqueries: true,
820 allow_parameters: true,
821 allow_windows: false,
822 };
823 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824 get = get.filter(vec![expr]);
825 }
826 } else {
827 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828 }
829
830 let mut sets = BTreeMap::new();
831 for Assignment { id, value } in assignments {
832 let name = normalize::column_name(id);
834 match desc.get_by_name(&name) {
835 Some((idx, typ)) => {
836 let ecx = &ExprContext {
837 qcx: &qcx,
838 name: "SET clause",
839 scope: &scope,
840 relation_type: &relation_type,
841 allow_aggregates: false,
842 allow_subqueries: false,
843 allow_parameters: true,
844 allow_windows: false,
845 };
846 let expr = plan_expr(ecx, &value)?.cast_to(
847 ecx,
848 CastContext::Assignment,
849 &typ.scalar_type,
850 )?;
851
852 if sets.insert(idx, expr).is_some() {
853 sql_bail!("column {} set twice", name)
854 }
855 }
856 None => sql_bail!("unknown column {}", name),
857 };
858 }
859
860 let finishing = RowSetFinishing {
861 order_by: vec![],
862 limit: None,
863 offset: 0,
864 project: (0..desc.arity()).collect(),
865 };
866
867 Ok(ReadThenWritePlan {
868 id,
869 selection: get,
870 finishing,
871 assignments: sets,
872 })
873}
874
875fn handle_mutation_using_clause(
887 qcx: &QueryContext,
888 selection: Option<Expr<Aug>>,
889 using: Vec<TableWithJoins<Aug>>,
890 get: HirRelationExpr,
891 outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893 let (mut using_rel_expr, using_scope) =
897 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898 let (left, left_scope) = l;
899 plan_join(
900 qcx,
901 left,
902 left_scope,
903 &Join {
904 relation: TableFactor::NestedJoin {
905 join: Box::new(twj),
906 alias: None,
907 },
908 join_operator: JoinOperator::CrossJoin,
909 },
910 )
911 })?;
912
913 if let Some(expr) = selection {
914 let on = HirScalarExpr::literal_true();
920 let joined = using_rel_expr
921 .clone()
922 .join(get.clone(), on, JoinKind::Inner);
923 let joined_scope = using_scope.product(outer_scope)?;
924 let joined_relation_type = qcx.relation_type(&joined);
925
926 let ecx = &ExprContext {
927 qcx,
928 name: "WHERE clause",
929 scope: &joined_scope,
930 relation_type: &joined_relation_type,
931 allow_aggregates: false,
932 allow_subqueries: true,
933 allow_parameters: true,
934 allow_windows: false,
935 };
936
937 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944 use mz_expr::visit::Visit;
946 expr.visit_mut_post(&mut |e| {
947 if let HirScalarExpr::Column(c, _name) = e {
948 if c.column >= using_rel_arity {
949 c.level += 1;
950 c.column -= using_rel_arity;
951 };
952 }
953 })?;
954
955 using_rel_expr = using_rel_expr.filter(vec![expr]);
959 } else {
960 let _joined_scope = using_scope.product(outer_scope)?;
963 }
964 Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979 pub(crate) column: usize,
980 pub(crate) source_type: SqlScalarType,
981 pub(crate) target_type: SqlScalarType,
982}
983
984pub(crate) fn cast_relation<'a, I>(
988 qcx: &QueryContext,
989 ccx: CastContext,
990 expr: HirRelationExpr,
991 target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994 I: IntoIterator<Item = &'a SqlScalarType>,
995{
996 let ecx = &ExprContext {
997 qcx,
998 name: "values",
999 scope: &Scope::empty(),
1000 relation_type: &qcx.relation_type(&expr),
1001 allow_aggregates: false,
1002 allow_subqueries: true,
1003 allow_parameters: true,
1004 allow_windows: false,
1005 };
1006 let mut map_exprs = vec![];
1007 let mut project_key = vec![];
1008 for (i, target_typ) in target_types.into_iter().enumerate() {
1009 let expr = HirScalarExpr::column(i);
1010 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014 Ok(cast_expr) => {
1015 if expr == cast_expr {
1016 project_key.push(i);
1018 } else {
1019 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021 map_exprs.push(cast_expr);
1022 }
1023 }
1024 Err(_) => {
1025 return Err(CastRelationError {
1026 column: i,
1027 source_type: ecx.scalar_type(&expr),
1028 target_type: target_typ.clone(),
1029 });
1030 }
1031 }
1032 }
1033 Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036pub fn plan_as_of(
1039 scx: &StatementContext,
1040 as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042 match as_of {
1043 None => Ok(QueryWhen::Immediately),
1044 Some(as_of) => match as_of {
1045 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047 },
1048 }
1049}
1050
1051pub fn plan_as_of_or_up_to(
1061 scx: &StatementContext,
1062 mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064 let scope = Scope::empty();
1065 let desc = RelationDesc::empty();
1066 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069 transform_ast::transform(scx, &mut expr)?;
1070 let ecx = &ExprContext {
1071 qcx: &qcx,
1072 name: "AS OF or UP TO",
1073 scope: &scope,
1074 relation_type: desc.typ(),
1075 allow_aggregates: false,
1076 allow_subqueries: false,
1077 allow_parameters: false,
1078 allow_windows: false,
1079 };
1080 let hir = plan_expr(ecx, &expr)?.cast_to(
1081 ecx,
1082 CastContext::Assignment,
1083 &SqlScalarType::MzTimestamp,
1084 )?;
1085 if hir.contains_unmaterializable() {
1086 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087 }
1088 let timestamp = hir
1095 .into_literal_mz_timestamp()
1096 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097 Ok(timestamp)
1098}
1099
1100pub fn plan_secret_as(
1102 scx: &StatementContext,
1103 mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105 let scope = Scope::empty();
1106 let desc = RelationDesc::empty();
1107 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109 transform_ast::transform(scx, &mut expr)?;
1110
1111 let ecx = &ExprContext {
1112 qcx: &qcx,
1113 name: "AS",
1114 scope: &scope,
1115 relation_type: desc.typ(),
1116 allow_aggregates: false,
1117 allow_subqueries: false,
1118 allow_parameters: false,
1119 allow_windows: false,
1120 };
1121 let expr = plan_expr(ecx, &expr)?
1122 .type_as(ecx, &SqlScalarType::Bytes)?
1123 .lower_uncorrelated()?;
1124 Ok(expr)
1125}
1126
1127pub fn plan_webhook_validate_using(
1129 scx: &StatementContext,
1130 validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134 let CreateWebhookSourceCheck {
1135 options,
1136 using: mut expr,
1137 } = validate_using;
1138
1139 let mut column_typs = vec![];
1140 let mut column_names = vec![];
1141
1142 let (bodies, headers, secrets) = options
1143 .map(|o| (o.bodies, o.headers, o.secrets))
1144 .unwrap_or_default();
1145
1146 let mut body_tuples = vec![];
1148 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149 let scalar_type = use_bytes
1150 .then_some(SqlScalarType::Bytes)
1151 .unwrap_or(SqlScalarType::String);
1152 let name = alias
1153 .map(|a| a.into_string())
1154 .unwrap_or_else(|| "body".to_string());
1155
1156 column_typs.push(SqlColumnType {
1157 scalar_type,
1158 nullable: false,
1159 });
1160 column_names.push(name);
1161
1162 let column_idx = column_typs.len() - 1;
1164 assert_eq!(
1166 column_idx,
1167 column_names.len() - 1,
1168 "body column names and types don't match"
1169 );
1170 body_tuples.push((column_idx, use_bytes));
1171 }
1172
1173 let mut header_tuples = vec![];
1175
1176 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177 let value_type = use_bytes
1178 .then_some(SqlScalarType::Bytes)
1179 .unwrap_or(SqlScalarType::String);
1180 let name = alias
1181 .map(|a| a.into_string())
1182 .unwrap_or_else(|| "headers".to_string());
1183
1184 column_typs.push(SqlColumnType {
1185 scalar_type: SqlScalarType::Map {
1186 value_type: Box::new(value_type),
1187 custom_id: None,
1188 },
1189 nullable: false,
1190 });
1191 column_names.push(name);
1192
1193 let column_idx = column_typs.len() - 1;
1195 assert_eq!(
1197 column_idx,
1198 column_names.len() - 1,
1199 "header column names and types don't match"
1200 );
1201 header_tuples.push((column_idx, use_bytes));
1202 }
1203
1204 let mut validation_secrets = vec![];
1206
1207 for CreateWebhookSourceSecret {
1208 secret,
1209 alias,
1210 use_bytes,
1211 } in secrets
1212 {
1213 let scalar_type = use_bytes
1215 .then_some(SqlScalarType::Bytes)
1216 .unwrap_or(SqlScalarType::String);
1217
1218 column_typs.push(SqlColumnType {
1219 scalar_type,
1220 nullable: false,
1221 });
1222 let ResolvedItemName::Item {
1223 id,
1224 full_name: FullItemName { item, .. },
1225 ..
1226 } = secret
1227 else {
1228 return Err(PlanError::InvalidSecret(Box::new(secret)));
1229 };
1230
1231 let name = if let Some(alias) = alias {
1233 alias.into_string()
1234 } else {
1235 item
1236 };
1237 column_names.push(name);
1238
1239 let column_idx = column_typs.len() - 1;
1242 assert_eq!(
1244 column_idx,
1245 column_names.len() - 1,
1246 "column names and types don't match"
1247 );
1248
1249 validation_secrets.push(WebhookValidationSecret {
1250 id,
1251 column_idx,
1252 use_bytes,
1253 });
1254 }
1255
1256 let relation_typ = SqlRelationType::new(column_typs);
1257 let desc = RelationDesc::new(relation_typ, column_names.clone());
1258 let scope = Scope::from_source(None, column_names);
1259
1260 transform_ast::transform(scx, &mut expr)?;
1261
1262 let ecx = &ExprContext {
1263 qcx: &qcx,
1264 name: "CHECK",
1265 scope: &scope,
1266 relation_type: desc.typ(),
1267 allow_aggregates: false,
1268 allow_subqueries: false,
1269 allow_parameters: false,
1270 allow_windows: false,
1271 };
1272 let expr = plan_expr(ecx, &expr)?
1273 .type_as(ecx, &SqlScalarType::Bool)?
1274 .lower_uncorrelated()?;
1275 let validation = WebhookValidation {
1276 expression: expr,
1277 relation_desc: desc,
1278 bodies: body_tuples,
1279 headers: header_tuples,
1280 secrets: validation_secrets,
1281 };
1282 Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286 scx: &StatementContext,
1287 expr: &Expr<Aug>,
1288 target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291 let ecx = &ExprContext {
1292 qcx: &qcx,
1293 name: "DEFAULT expression",
1294 scope: &Scope::empty(),
1295 relation_type: &SqlRelationType::empty(),
1296 allow_aggregates: false,
1297 allow_subqueries: false,
1298 allow_parameters: false,
1299 allow_windows: false,
1300 };
1301 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302 Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306 scx: &'a StatementContext,
1307 params: Vec<Expr<Aug>>,
1308 desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310 if params.len() != desc.param_types.len() {
1311 sql_bail!(
1312 "expected {} params, got {}",
1313 desc.param_types.len(),
1314 params.len()
1315 );
1316 }
1317
1318 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320 let mut datums = Row::default();
1321 let mut packer = datums.packer();
1322 let mut actual_types = Vec::new();
1323 let temp_storage = &RowArena::new();
1324 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325 transform_ast::transform(scx, &mut expr)?;
1326
1327 let ecx = execute_expr_context(&qcx);
1328 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329 let actual_ty = ecx.scalar_type(&ex);
1330 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331 return Err(PlanError::WrongParameterType(
1332 i + 1,
1333 ecx.humanize_scalar_type(expected_ty, false),
1334 ecx.humanize_scalar_type(&actual_ty, false),
1335 ));
1336 }
1337 let ex = ex.lower_uncorrelated()?;
1338 let evaled = ex.eval(&[], temp_storage)?;
1339 packer.push(evaled);
1340 actual_types.push(actual_ty);
1341 }
1342 Ok(Params {
1343 datums,
1344 execute_types: actual_types,
1345 expected_types: desc.param_types.clone(),
1346 })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354 ExprContext {
1355 qcx,
1356 name: "EXECUTE",
1357 scope: &EXECUTE_CONTEXT_SCOPE,
1358 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359 allow_aggregates: false,
1360 allow_subqueries: false,
1361 allow_parameters: false,
1362 allow_windows: false,
1363 }
1364}
1365
1366pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371 LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374 scx: &'a StatementContext,
1375 on_desc: &RelationDesc,
1376 exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378 let scope = Scope::from_source(None, on_desc.iter_names());
1379 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381 let ecx = &ExprContext {
1382 qcx: &qcx,
1383 name: "CREATE INDEX",
1384 scope: &scope,
1385 relation_type: on_desc.typ(),
1386 allow_aggregates: false,
1387 allow_subqueries: false,
1388 allow_parameters: false,
1389 allow_windows: false,
1390 };
1391 let mut out = vec![];
1392 for mut expr in exprs {
1393 transform_ast::transform(scx, &mut expr)?;
1394 let expr = plan_expr_or_col_index(ecx, &expr)?;
1395 let mut expr = expr.lower_uncorrelated()?;
1396 expr.reduce(&on_desc.typ().column_types);
1397 out.push(expr);
1398 }
1399 Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404 Some(column) => Ok(HirScalarExpr::column(column)),
1405 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406 }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410 match e {
1411 Expr::Value(Value::Number(n)) => {
1412 let n = n.parse::<usize>().map_err(|e| {
1413 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414 })?;
1415 if n < 1 || n > max {
1416 sql_bail!(
1417 "column reference {} in {} is out of range (1 - {})",
1418 n,
1419 name,
1420 max
1421 );
1422 }
1423 Ok(Some(n - 1))
1424 }
1425 _ => Ok(None),
1426 }
1427}
1428
1429struct PlannedQuery {
1430 expr: HirRelationExpr,
1431 scope: Scope,
1432 order_by: Vec<ColumnOrder>,
1433 limit: Option<HirScalarExpr>,
1434 offset: HirScalarExpr,
1440 project: Vec<usize>,
1441 group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449 let cte_bindings = plan_ctes(qcx, q)?;
1452
1453 let limit = match &q.limit {
1454 None => None,
1455 Some(Limit {
1456 quantity,
1457 with_ties: false,
1458 }) => {
1459 let ecx = &ExprContext {
1460 qcx,
1461 name: "LIMIT",
1462 scope: &Scope::empty(),
1463 relation_type: &SqlRelationType::empty(),
1464 allow_aggregates: false,
1465 allow_subqueries: true,
1466 allow_parameters: true,
1467 allow_windows: false,
1468 };
1469 let limit = plan_expr(ecx, quantity)?;
1470 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472 let limit = if limit.is_constant() {
1473 let arena = RowArena::new();
1474 let limit = limit.lower_uncorrelated()?;
1475
1476 match limit.eval(&[], &arena)? {
1480 d @ Datum::Int64(v) if v >= 0 => {
1481 HirScalarExpr::literal(d, SqlScalarType::Int64)
1482 }
1483 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486 }
1487 } else {
1488 qcx.scx
1490 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491 limit
1492 };
1493
1494 Some(limit)
1495 }
1496 Some(Limit {
1497 quantity: _,
1498 with_ties: true,
1499 }) => bail_unsupported!("FETCH ... WITH TIES"),
1500 };
1501
1502 let offset = match &q.offset {
1503 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504 Some(offset) => {
1505 let ecx = &ExprContext {
1506 qcx,
1507 name: "OFFSET",
1508 scope: &Scope::empty(),
1509 relation_type: &SqlRelationType::empty(),
1510 allow_aggregates: false,
1511 allow_subqueries: false,
1512 allow_parameters: true,
1513 allow_windows: false,
1514 };
1515 let offset = plan_expr(ecx, offset)?;
1516 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518 let offset = if offset.is_constant() {
1519 let offset_value = offset_into_value(offset)?;
1521 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522 } else {
1523 if !offset.contains_parameters() {
1527 return Err(PlanError::InvalidOffset(format!(
1528 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529 offset
1530 )));
1531 }
1532 offset
1533 };
1534 offset
1535 }
1536 };
1537
1538 let mut planned_query = match &q.body {
1539 SetExpr::Select(s) => {
1540 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545 PlannedQuery {
1546 expr: plan.expr,
1547 scope: plan.scope,
1548 order_by: plan.order_by,
1549 project: plan.project,
1550 limit,
1551 offset,
1552 group_size_hints,
1553 }
1554 }
1555 _ => {
1556 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557 let ecx = &ExprContext {
1558 qcx,
1559 name: "ORDER BY clause of a set expression",
1560 scope: &scope,
1561 relation_type: &qcx.relation_type(&expr),
1562 allow_aggregates: false,
1563 allow_subqueries: true,
1564 allow_parameters: true,
1565 allow_windows: false,
1566 };
1567 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569 let project = (0..ecx.relation_type.arity()).collect();
1570 PlannedQuery {
1571 expr: expr.map(map_exprs),
1572 scope,
1573 order_by,
1574 limit,
1575 project,
1576 offset,
1577 group_size_hints: GroupSizeHints::default(),
1578 }
1579 }
1580 };
1581
1582 match &q.ctes {
1584 CteBlock::Simple(_) => {
1585 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586 if let Some(cte) = qcx.ctes.remove(&id) {
1587 planned_query.expr = HirRelationExpr::Let {
1588 name: cte.name,
1589 id: id.clone(),
1590 value: Box::new(value),
1591 body: Box::new(planned_query.expr),
1592 };
1593 }
1594 if let Some(shadowed_val) = shadowed_val {
1595 qcx.ctes.insert(id, shadowed_val);
1596 }
1597 }
1598 }
1599 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600 let MutRecBlockOptionExtracted {
1601 recursion_limit,
1602 return_at_recursion_limit,
1603 error_at_recursion_limit,
1604 seen: _,
1605 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606 let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607 (None, None, None) => None,
1608 (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609 (None, Some(max_iters), None) => Some((max_iters, true)),
1610 (None, None, Some(max_iters)) => Some((max_iters, false)),
1611 _ => {
1612 return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613 }
1614 }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616 return_at_limit: *return_at_limit,
1617 }))?;
1618
1619 let mut bindings = Vec::new();
1620 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621 if let Some(cte) = qcx.ctes.remove(&id) {
1622 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1623 }
1624 if let Some(shadowed_val) = shadowed_val {
1625 qcx.ctes.insert(id, shadowed_val);
1626 }
1627 }
1628 if !bindings.is_empty() {
1629 planned_query.expr = HirRelationExpr::LetRec {
1630 limit,
1631 bindings,
1632 body: Box::new(planned_query.expr),
1633 }
1634 }
1635 }
1636 }
1637
1638 Ok(planned_query)
1639}
1640
1641pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643 let offset = offset
1644 .try_into_literal_int64()
1645 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646 if offset < 0 {
1647 return Err(PlanError::InvalidOffset(format!(
1648 "must not be negative, got {}",
1649 offset
1650 )));
1651 }
1652 Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656 MutRecBlockOption,
1657 (RecursionLimit, u64),
1658 (ReturnAtRecursionLimit, u64),
1659 (ErrorAtRecursionLimit, u64)
1660);
1661
1662pub fn plan_ctes(
1667 qcx: &mut QueryContext,
1668 q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670 let mut result = Vec::new();
1672 let mut shadowed_descs = BTreeMap::new();
1675
1676 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678 sql_bail!(
1679 "WITH query name {} specified more than once",
1680 normalize::ident_ref(ident).quoted()
1681 )
1682 }
1683
1684 match &q.ctes {
1685 CteBlock::Simple(ctes) => {
1686 for cte in ctes.iter() {
1688 let cte_name = normalize::ident(cte.alias.name.clone());
1689 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690 let typ = qcx.relation_type(&val);
1691 let mut desc = RelationDesc::new(typ, scope.column_names());
1692 plan_utils::maybe_rename_columns(
1693 format!("CTE {}", cte.alias.name),
1694 &mut desc,
1695 &cte.alias.columns,
1696 )?;
1697 let shadowed = qcx.ctes.insert(
1699 cte.id,
1700 CteDesc {
1701 name: cte_name,
1702 desc,
1703 },
1704 );
1705
1706 result.push((cte.id, val, shadowed));
1707 }
1708 }
1709 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710 for cte in ctes.iter() {
1712 let cte_name = normalize::ident(cte.name.clone());
1713 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714 for column in cte.columns.iter() {
1715 desc_columns.push((
1716 normalize::column_name(column.name.clone()),
1717 SqlColumnType {
1718 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719 nullable: true,
1720 },
1721 ));
1722 }
1723 let desc = RelationDesc::from_names_and_types(desc_columns);
1724 let shadowed = qcx.ctes.insert(
1725 cte.id,
1726 CteDesc {
1727 name: cte_name,
1728 desc,
1729 },
1730 );
1731 if let Some(shadowed) = shadowed {
1733 shadowed_descs.insert(cte.id, shadowed);
1734 }
1735 }
1736
1737 for cte in ctes.iter() {
1739 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744 sql_bail!(
1747 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748 );
1749 }
1750
1751 if !proposed_typ.keys.is_empty() {
1752 sql_bail!("[internal error]: WMR CTEs do not support keys");
1755 }
1756
1757 let derived_typ = qcx.relation_type(&val);
1759
1760 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761 let cte_name = normalize::ident(cte.name.clone());
1762 let proposed_typ = proposed_typ
1763 .column_types
1764 .iter()
1765 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766 .collect::<Vec<_>>();
1767 let inferred_typ = derived_typ
1768 .column_types
1769 .iter()
1770 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771 .collect::<Vec<_>>();
1772 Err(PlanError::RecursiveTypeMismatch(
1773 cte_name,
1774 proposed_typ,
1775 inferred_typ,
1776 ))
1777 };
1778
1779 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780 return type_err(proposed_typ, derived_typ);
1781 }
1782
1783 let val = match cast_relation(
1785 qcx,
1786 CastContext::Assignment,
1791 val,
1792 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793 ) {
1794 Ok(val) => val,
1795 Err(_) => return type_err(proposed_typ, derived_typ),
1796 };
1797
1798 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799 }
1800 }
1801 }
1802
1803 Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807 qcx: &mut QueryContext,
1808 q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810 let PlannedQuery {
1811 mut expr,
1812 scope,
1813 order_by,
1814 limit,
1815 offset,
1816 project,
1817 group_size_hints,
1818 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819 if limit.is_some()
1820 || !offset
1821 .clone()
1822 .try_into_literal_int64()
1823 .is_ok_and(|offset| offset == 0)
1824 {
1825 expr = HirRelationExpr::top_k(
1826 expr,
1827 vec![],
1828 order_by,
1829 limit,
1830 offset,
1831 group_size_hints.limit_input_group_size,
1832 );
1833 }
1834 Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838 qcx: &mut QueryContext,
1839 q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841 match q {
1842 SetExpr::Select(select) => {
1843 let order_by_exprs = Vec::new();
1844 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845 assert!(plan.order_by.is_empty());
1848 Ok((plan.expr.project(plan.project), plan.scope))
1849 }
1850 SetExpr::SetOperation {
1851 op,
1852 all,
1853 left,
1854 right,
1855 } => {
1856 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858 let (right_expr, right_scope) =
1859 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861 let left_type = qcx.relation_type(&left_expr);
1863 let right_type = qcx.relation_type(&right_expr);
1864 if left_type.arity() != right_type.arity() {
1865 sql_bail!(
1866 "each {} query must have the same number of columns: {} vs {}",
1867 op,
1868 left_type.arity(),
1869 right_type.arity(),
1870 );
1871 }
1872
1873 let left_ecx = &ExprContext {
1878 qcx,
1879 name: &op.to_string(),
1880 scope: &left_scope,
1881 relation_type: &left_type,
1882 allow_aggregates: false,
1883 allow_subqueries: false,
1884 allow_parameters: false,
1885 allow_windows: false,
1886 };
1887 let right_ecx = &ExprContext {
1888 qcx,
1889 name: &op.to_string(),
1890 scope: &right_scope,
1891 relation_type: &right_type,
1892 allow_aggregates: false,
1893 allow_subqueries: false,
1894 allow_parameters: false,
1895 allow_windows: false,
1896 };
1897 let mut left_casts = vec![];
1898 let mut right_casts = vec![];
1899 for (i, (left_type, right_type)) in left_type
1900 .column_types
1901 .iter()
1902 .zip_eq(right_type.column_types.iter())
1903 .enumerate()
1904 {
1905 let types = &[
1906 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908 ];
1909 let target =
1910 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911 match typeconv::plan_cast(
1912 left_ecx,
1913 CastContext::Implicit,
1914 HirScalarExpr::column(i),
1915 &target,
1916 ) {
1917 Ok(expr) => left_casts.push(expr),
1918 Err(_) => sql_bail!(
1919 "{} types {} and {} cannot be matched",
1920 op,
1921 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922 qcx.humanize_scalar_type(&target, false),
1923 ),
1924 }
1925 match typeconv::plan_cast(
1926 right_ecx,
1927 CastContext::Implicit,
1928 HirScalarExpr::column(i),
1929 &target,
1930 ) {
1931 Ok(expr) => right_casts.push(expr),
1932 Err(_) => sql_bail!(
1933 "{} types {} and {} cannot be matched",
1934 op,
1935 qcx.humanize_scalar_type(&target, false),
1936 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937 ),
1938 }
1939 }
1940 let lhs = if left_casts
1941 .iter()
1942 .enumerate()
1943 .any(|(i, e)| e != &HirScalarExpr::column(i))
1944 {
1945 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946 left_expr.map(left_casts).project(project_key)
1947 } else {
1948 left_expr
1949 };
1950 let rhs = if right_casts
1951 .iter()
1952 .enumerate()
1953 .any(|(i, e)| e != &HirScalarExpr::column(i))
1954 {
1955 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956 right_expr.map(right_casts).project(project_key)
1957 } else {
1958 right_expr
1959 };
1960
1961 let relation_expr = match op {
1962 SetOperator::Union => {
1963 if *all {
1964 lhs.union(rhs)
1965 } else {
1966 lhs.union(rhs).distinct()
1967 }
1968 }
1969 SetOperator::Except => Hir::except(all, lhs, rhs),
1970 SetOperator::Intersect => {
1971 let left_clone = lhs.clone();
1977 if *all {
1978 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979 } else {
1980 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981 .distinct()
1982 }
1983 }
1984 };
1985 let scope = Scope::from_source(
1986 None,
1987 left_scope.column_names(),
1989 );
1990
1991 Ok((relation_expr, scope))
1992 }
1993 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994 SetExpr::Table(name) => {
1995 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996 Ok((expr, scope))
1997 }
1998 SetExpr::Query(query) => {
1999 let (expr, scope) = plan_nested_query(qcx, query)?;
2000 Ok((expr, scope))
2001 }
2002 SetExpr::Show(stmt) => {
2003 if !qcx.lifetime.allow_show() {
2017 return Err(PlanError::ShowCommandInView);
2018 }
2019
2020 fn to_hirscope(
2023 plan: ShowCreatePlan,
2024 desc: StatementDesc,
2025 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027 let desc = desc.relation_desc.expect("must exist");
2028 let scope = Scope::from_source(None, desc.iter_names());
2029 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2030 Ok((expr, scope))
2031 }
2032
2033 match stmt.clone() {
2034 ShowStatement::ShowColumns(stmt) => {
2035 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036 }
2037 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039 show::describe_show_create_connection(qcx.scx, stmt)?,
2040 ),
2041 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043 show::describe_show_create_cluster(qcx.scx, stmt)?,
2044 ),
2045 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_index(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_sink(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_source(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_table(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_view(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068 ),
2069 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2070 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2071 show::describe_show_create_type(qcx.scx, stmt)?,
2072 ),
2073 ShowStatement::ShowObjects(stmt) => {
2074 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2075 }
2076 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2077 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2078 }
2079 }
2080 }
2081}
2082
2083fn plan_values(
2085 qcx: &QueryContext,
2086 values: &[Vec<Expr<Aug>>],
2087) -> Result<(HirRelationExpr, Scope), PlanError> {
2088 assert!(!values.is_empty());
2089
2090 let ecx = &ExprContext {
2091 qcx,
2092 name: "VALUES",
2093 scope: &Scope::empty(),
2094 relation_type: &SqlRelationType::empty(),
2095 allow_aggregates: false,
2096 allow_subqueries: true,
2097 allow_parameters: true,
2098 allow_windows: false,
2099 };
2100
2101 let ncols = values[0].len();
2102 let nrows = values.len();
2103
2104 let mut cols = vec![vec![]; ncols];
2107 for row in values {
2108 if row.len() != ncols {
2109 sql_bail!(
2110 "VALUES expression has varying number of columns: {} vs {}",
2111 row.len(),
2112 ncols
2113 );
2114 }
2115 for (i, v) in row.iter().enumerate() {
2116 cols[i].push(v);
2117 }
2118 }
2119
2120 let mut col_iters = Vec::with_capacity(ncols);
2122 let mut col_types = Vec::with_capacity(ncols);
2123 for col in &cols {
2124 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2125 let mut col_type = ecx.column_type(&col[0]);
2126 for val in &col[1..] {
2127 col_type = col_type.union(&ecx.column_type(val))?;
2128 }
2129 col_types.push(col_type);
2130 col_iters.push(col.into_iter());
2131 }
2132
2133 let mut exprs = vec![];
2135 for _ in 0..nrows {
2136 for i in 0..ncols {
2137 exprs.push(col_iters[i].next().unwrap());
2138 }
2139 }
2140 let out = HirRelationExpr::CallTable {
2141 func: TableFunc::Wrap {
2142 width: ncols,
2143 types: col_types,
2144 },
2145 exprs,
2146 };
2147
2148 let mut scope = Scope::empty();
2150 for i in 0..ncols {
2151 let name = format!("column{}", i + 1);
2152 scope.items.push(ScopeItem::from_column_name(name));
2153 }
2154
2155 Ok((out, scope))
2156}
2157
2158fn plan_values_insert(
2168 qcx: &QueryContext,
2169 target_names: &[&ColumnName],
2170 target_types: &[&SqlScalarType],
2171 values: &[Vec<Expr<Aug>>],
2172) -> Result<HirRelationExpr, PlanError> {
2173 assert!(!values.is_empty());
2174
2175 if !values.iter().map(|row| row.len()).all_equal() {
2176 sql_bail!("VALUES lists must all be the same length");
2177 }
2178
2179 let ecx = &ExprContext {
2180 qcx,
2181 name: "VALUES",
2182 scope: &Scope::empty(),
2183 relation_type: &SqlRelationType::empty(),
2184 allow_aggregates: false,
2185 allow_subqueries: true,
2186 allow_parameters: true,
2187 allow_windows: false,
2188 };
2189
2190 let mut exprs = vec![];
2191 let mut types = vec![];
2192 for row in values {
2193 if row.len() > target_names.len() {
2194 sql_bail!("INSERT has more expressions than target columns");
2195 }
2196 for (column, val) in row.into_iter().enumerate() {
2197 let target_type = &target_types[column];
2198 let val = plan_expr(ecx, val)?;
2199 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2200 let source_type = &ecx.scalar_type(&val);
2201 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2202 Ok(val) => val,
2203 Err(_) => sql_bail!(
2204 "column {} is of type {} but expression is of type {}",
2205 target_names[column].quoted(),
2206 qcx.humanize_scalar_type(target_type, false),
2207 qcx.humanize_scalar_type(source_type, false),
2208 ),
2209 };
2210 if column >= types.len() {
2211 types.push(ecx.column_type(&val));
2212 } else {
2213 types[column] = types[column].union(&ecx.column_type(&val))?;
2214 }
2215 exprs.push(val);
2216 }
2217 }
2218
2219 Ok(HirRelationExpr::CallTable {
2220 func: TableFunc::Wrap {
2221 width: values[0].len(),
2222 types,
2223 },
2224 exprs,
2225 })
2226}
2227
2228fn plan_join_identity() -> (HirRelationExpr, Scope) {
2229 let typ = SqlRelationType::new(vec![]);
2230 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2231 let scope = Scope::empty();
2232 (expr, scope)
2233}
2234
2235#[derive(Debug)]
2241struct SelectPlan {
2242 expr: HirRelationExpr,
2243 scope: Scope,
2244 order_by: Vec<ColumnOrder>,
2245 project: Vec<usize>,
2246}
2247
2248generate_extracted_config!(
2249 SelectOption,
2250 (ExpectedGroupSize, u64),
2251 (AggregateInputGroupSize, u64),
2252 (DistinctOnInputGroupSize, u64),
2253 (LimitInputGroupSize, u64)
2254);
2255
2256fn plan_select_from_where(
2274 qcx: &QueryContext,
2275 mut s: Select<Aug>,
2276 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2277) -> Result<SelectPlan, PlanError> {
2278 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2285 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2286
2287 let (mut relation_expr, mut from_scope) =
2289 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2290 let (left, left_scope) = l;
2291 plan_join(
2292 qcx,
2293 left,
2294 left_scope,
2295 &Join {
2296 relation: TableFactor::NestedJoin {
2297 join: Box::new(twj.clone()),
2298 alias: None,
2299 },
2300 join_operator: JoinOperator::CrossJoin,
2301 },
2302 )
2303 })?;
2304
2305 if let Some(selection) = &s.selection {
2307 let ecx = &ExprContext {
2308 qcx,
2309 name: "WHERE clause",
2310 scope: &from_scope,
2311 relation_type: &qcx.relation_type(&relation_expr),
2312 allow_aggregates: false,
2313 allow_subqueries: true,
2314 allow_parameters: true,
2315 allow_windows: false,
2316 };
2317 let expr = plan_expr(ecx, selection)
2318 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2319 .type_as(ecx, &SqlScalarType::Bool)?;
2320 relation_expr = relation_expr.filter(vec![expr]);
2321 }
2322
2323 let (aggregates, table_funcs) = {
2326 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2327 visitor.visit_select_mut(&mut s);
2328 for o in order_by_exprs.iter_mut() {
2329 visitor.visit_order_by_expr_mut(o);
2330 }
2331 visitor.into_result()?
2332 };
2333 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2334 if !table_funcs.is_empty() {
2335 let (expr, scope) = plan_scalar_table_funcs(
2336 qcx,
2337 table_funcs,
2338 &mut table_func_names,
2339 &relation_expr,
2340 &from_scope,
2341 )?;
2342 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2343 from_scope = from_scope.product(scope)?;
2344 }
2345
2346 let projection = {
2348 let ecx = &ExprContext {
2349 qcx,
2350 name: "SELECT clause",
2351 scope: &from_scope,
2352 relation_type: &qcx.relation_type(&relation_expr),
2353 allow_aggregates: true,
2354 allow_subqueries: true,
2355 allow_parameters: true,
2356 allow_windows: true,
2357 };
2358 let mut out = vec![];
2359 for si in &s.projection {
2360 if *si == SelectItem::Wildcard && s.from.is_empty() {
2361 sql_bail!("SELECT * with no tables specified is not valid");
2362 }
2363 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2364 }
2365 out
2366 };
2367
2368 let (mut group_scope, select_all_mapping) = {
2372 let ecx = &ExprContext {
2374 qcx,
2375 name: "GROUP BY clause",
2376 scope: &from_scope,
2377 relation_type: &qcx.relation_type(&relation_expr),
2378 allow_aggregates: false,
2379 allow_subqueries: true,
2380 allow_parameters: true,
2381 allow_windows: false,
2382 };
2383 let mut group_key = vec![];
2384 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2385 let mut group_hir_exprs = vec![];
2386 let mut group_scope = Scope::empty();
2387 let mut select_all_mapping = BTreeMap::new();
2388
2389 for group_expr in &s.group_by {
2390 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2391 let new_column = group_key.len();
2392
2393 if let Some(group_expr) = group_expr {
2394 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2398 existing_scope_item.exprs.insert(group_expr.clone());
2399 continue;
2400 }
2401 }
2402
2403 let mut scope_item = if let HirScalarExpr::Column(
2404 ColumnRef {
2405 level: 0,
2406 column: old_column,
2407 },
2408 _name,
2409 ) = &expr
2410 {
2411 select_all_mapping.insert(*old_column, new_column);
2417 let scope_item = ecx.scope.items[*old_column].clone();
2418 scope_item
2419 } else {
2420 ScopeItem::empty()
2421 };
2422
2423 if let Some(group_expr) = group_expr.cloned() {
2424 scope_item.exprs.insert(group_expr);
2425 }
2426
2427 group_key.push(from_scope.len() + group_exprs.len());
2428 group_hir_exprs.push(expr.clone());
2429 group_exprs.insert(expr, scope_item);
2430 }
2431
2432 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2433 for expr in &group_hir_exprs {
2434 if let Some(scope_item) = group_exprs.remove(expr) {
2435 group_scope.items.push(scope_item);
2436 }
2437 }
2438
2439 let ecx = &ExprContext {
2441 qcx,
2442 name: "aggregate function",
2443 scope: &from_scope,
2444 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2445 allow_aggregates: false,
2446 allow_subqueries: true,
2447 allow_parameters: true,
2448 allow_windows: false,
2449 };
2450 let mut agg_exprs = vec![];
2451 for sql_function in aggregates {
2452 if sql_function.over.is_some() {
2453 unreachable!(
2454 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2455 );
2456 }
2457 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2458 group_scope
2459 .items
2460 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2461 }
2462 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2463 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2465 group_key,
2466 agg_exprs,
2467 group_size_hints.aggregate_input_group_size,
2468 );
2469
2470 for i in 0..from_scope.len() {
2476 if !select_all_mapping.contains_key(&i) {
2477 let scope_item = &ecx.scope.items[i];
2478 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2479 table_name: scope_item.table_name.clone(),
2480 column_name: scope_item.column_name.clone(),
2481 allow_unqualified_references: scope_item.allow_unqualified_references,
2482 });
2483 }
2484 }
2485
2486 (group_scope, select_all_mapping)
2487 } else {
2488 (
2490 from_scope.clone(),
2491 (0..from_scope.len()).map(|i| (i, i)).collect(),
2492 )
2493 }
2494 };
2495
2496 if let Some(ref having) = s.having {
2498 let ecx = &ExprContext {
2499 qcx,
2500 name: "HAVING clause",
2501 scope: &group_scope,
2502 relation_type: &qcx.relation_type(&relation_expr),
2503 allow_aggregates: true,
2504 allow_subqueries: true,
2505 allow_parameters: true,
2506 allow_windows: false,
2507 };
2508 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2509 relation_expr = relation_expr.filter(vec![expr]);
2510 }
2511
2512 let window_funcs = {
2525 let mut visitor = WindowFuncCollector::default();
2526 visitor.visit_select(&s);
2530 for o in order_by_exprs.iter() {
2531 visitor.visit_order_by_expr(o);
2532 }
2533 visitor.into_result()
2534 };
2535 for window_func in window_funcs {
2536 let ecx = &ExprContext {
2537 qcx,
2538 name: "window function",
2539 scope: &group_scope,
2540 relation_type: &qcx.relation_type(&relation_expr),
2541 allow_aggregates: true,
2542 allow_subqueries: true,
2543 allow_parameters: true,
2544 allow_windows: true,
2545 };
2546 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2547 group_scope.items.push(ScopeItem::from_expr(window_func));
2548 }
2549 if let Some(ref qualify) = s.qualify {
2556 let ecx = &ExprContext {
2557 qcx,
2558 name: "QUALIFY clause",
2559 scope: &group_scope,
2560 relation_type: &qcx.relation_type(&relation_expr),
2561 allow_aggregates: true,
2562 allow_subqueries: true,
2563 allow_parameters: true,
2564 allow_windows: true,
2565 };
2566 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2567 relation_expr = relation_expr.filter(vec![expr]);
2568 }
2569
2570 let output_columns = {
2572 let mut new_exprs = vec![];
2573 let mut new_type = qcx.relation_type(&relation_expr);
2574 let mut output_columns = vec![];
2575 for (select_item, column_name) in &projection {
2576 let ecx = &ExprContext {
2577 qcx,
2578 name: "SELECT clause",
2579 scope: &group_scope,
2580 relation_type: &new_type,
2581 allow_aggregates: true,
2582 allow_subqueries: true,
2583 allow_parameters: true,
2584 allow_windows: true,
2585 };
2586 let expr = match select_item {
2587 ExpandedSelectItem::InputOrdinal(i) => {
2588 if let Some(column) = select_all_mapping.get(i).copied() {
2589 HirScalarExpr::column(column)
2590 } else {
2591 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2592 }
2593 }
2594 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2595 };
2596 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2597 output_columns.push((column, column_name));
2599 } else {
2600 let typ = ecx.column_type(&expr);
2607 new_type.column_types.push(typ);
2608 new_exprs.push(expr);
2609 output_columns.push((group_scope.len(), column_name));
2610 group_scope
2611 .items
2612 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2613 }
2614 }
2615 relation_expr = relation_expr.map(new_exprs);
2616 output_columns
2617 };
2618 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2619
2620 let order_by = {
2622 let relation_type = qcx.relation_type(&relation_expr);
2623 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2624 &ExprContext {
2625 qcx,
2626 name: "ORDER BY clause",
2627 scope: &group_scope,
2628 relation_type: &relation_type,
2629 allow_aggregates: true,
2630 allow_subqueries: true,
2631 allow_parameters: true,
2632 allow_windows: true,
2633 },
2634 &order_by_exprs,
2635 &output_columns,
2636 )?;
2637
2638 match s.distinct {
2639 None => relation_expr = relation_expr.map(map_exprs),
2640 Some(Distinct::EntireRow) => {
2641 if relation_type.arity() == 0 {
2642 sql_bail!("SELECT DISTINCT must have at least one column");
2643 }
2644 if !try_push_projection_order_by(
2648 &mut relation_expr,
2649 &mut project_key,
2650 &mut order_by,
2651 ) {
2652 sql_bail!(
2653 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2654 );
2655 }
2656 assert!(map_exprs.is_empty());
2657 relation_expr = relation_expr.distinct();
2658 }
2659 Some(Distinct::On(exprs)) => {
2660 let ecx = &ExprContext {
2661 qcx,
2662 name: "DISTINCT ON clause",
2663 scope: &group_scope,
2664 relation_type: &qcx.relation_type(&relation_expr),
2665 allow_aggregates: true,
2666 allow_subqueries: true,
2667 allow_parameters: true,
2668 allow_windows: true,
2669 };
2670
2671 let mut distinct_exprs = vec![];
2672 for expr in &exprs {
2673 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2674 distinct_exprs.push(expr);
2675 }
2676
2677 let mut distinct_key = vec![];
2678
2679 let arity = relation_type.arity();
2689 for ord in order_by.iter().take(distinct_exprs.len()) {
2690 let mut expr = &HirScalarExpr::column(ord.column);
2693 if ord.column >= arity {
2694 expr = &map_exprs[ord.column - arity];
2695 };
2696 match distinct_exprs.iter().position(move |e| e == expr) {
2697 None => sql_bail!(
2698 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2699 ),
2700 Some(pos) => {
2701 distinct_exprs.remove(pos);
2702 }
2703 }
2704 distinct_key.push(ord.column);
2705 }
2706
2707 for expr in distinct_exprs {
2709 let column = match expr {
2712 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2713 _ => {
2714 map_exprs.push(expr);
2715 arity + map_exprs.len() - 1
2716 }
2717 };
2718 distinct_key.push(column);
2719 }
2720
2721 let distinct_len = distinct_key.len();
2726 relation_expr = HirRelationExpr::top_k(
2727 relation_expr.map(map_exprs),
2728 distinct_key,
2729 order_by.iter().skip(distinct_len).cloned().collect(),
2730 Some(HirScalarExpr::literal(
2731 Datum::Int64(1),
2732 SqlScalarType::Int64,
2733 )),
2734 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2735 group_size_hints.distinct_on_input_group_size,
2736 );
2737 }
2738 }
2739
2740 order_by
2741 };
2742
2743 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2748
2749 Ok(SelectPlan {
2750 expr: relation_expr,
2751 scope,
2752 order_by,
2753 project: project_key,
2754 })
2755}
2756
2757fn plan_scalar_table_funcs(
2758 qcx: &QueryContext,
2759 table_funcs: BTreeMap<Function<Aug>, String>,
2760 table_func_names: &mut BTreeMap<String, Ident>,
2761 relation_expr: &HirRelationExpr,
2762 from_scope: &Scope,
2763) -> Result<(HirRelationExpr, Scope), PlanError> {
2764 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2765
2766 for (table_func, id) in table_funcs.iter() {
2767 table_func_names.insert(
2768 id.clone(),
2769 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2771 );
2772 }
2773 if table_funcs.len() == 1 {
2776 let (table_func, id) = table_funcs.iter().next().unwrap();
2777 let (expr, mut scope) =
2778 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2779
2780 let num_cols = scope.len();
2782 for i in 0..scope.len() {
2783 scope.items[i].table_name = Some(PartialItemName {
2784 database: None,
2785 schema: None,
2786 item: id.clone(),
2787 });
2788 scope.items[i].from_single_column_function = num_cols == 1;
2789 scope.items[i].allow_unqualified_references = false;
2790 }
2791 return Ok((expr, scope));
2792 }
2793 let (expr, mut scope, num_cols) =
2795 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797 let mut i = 0;
2799 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800 for _ in 0..num_cols {
2801 scope.items[i].table_name = Some(PartialItemName {
2802 database: None,
2803 schema: None,
2804 item: id.clone(),
2805 });
2806 scope.items[i].from_single_column_function = num_cols == 1;
2807 scope.items[i].allow_unqualified_references = false;
2808 i += 1;
2809 }
2810 scope.items[i].table_name = Some(PartialItemName {
2814 database: None,
2815 schema: None,
2816 item: id.clone(),
2817 });
2818 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819 scope.items[i].allow_unqualified_references = false;
2820 i += 1;
2821 }
2822 scope.items[i].allow_unqualified_references = false;
2824 Ok((expr, scope))
2825}
2826
2827fn plan_group_by_expr<'a>(
2834 ecx: &ExprContext,
2835 group_expr: &'a Expr<Aug>,
2836 projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838 let plan_projection = |column: usize| match &projection[column].0 {
2839 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840 ExpandedSelectItem::Expr(expr) => {
2841 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842 }
2843 };
2844
2845 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848 return plan_projection(column);
2849 }
2850
2851 match group_expr {
2855 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856 Err(PlanError::UnknownColumn {
2857 table: None,
2858 column,
2859 similar,
2860 }) => {
2861 let mut iter = projection.iter().map(|(_expr, name)| name);
2864 if let Some(i) = iter.position(|n| *n == column) {
2865 if iter.any(|n| *n == column) {
2866 Err(PlanError::AmbiguousColumn(column))
2867 } else {
2868 plan_projection(i)
2869 }
2870 } else {
2871 Err(PlanError::UnknownColumn {
2874 table: None,
2875 column,
2876 similar,
2877 })
2878 }
2879 }
2880 res => Ok((Some(group_expr), res?)),
2881 },
2882 _ => Ok((
2883 Some(group_expr),
2884 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885 )),
2886 }
2887}
2888
2889pub(crate) fn plan_order_by_exprs(
2897 ecx: &ExprContext,
2898 order_by_exprs: &[OrderByExpr<Aug>],
2899 output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901 let mut order_by = vec![];
2902 let mut map_exprs = vec![];
2903 for obe in order_by_exprs {
2904 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905 let column = match expr {
2908 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909 _ => {
2910 map_exprs.push(expr);
2911 ecx.relation_type.arity() + map_exprs.len() - 1
2912 }
2913 };
2914 order_by.push(resolve_desc_and_nulls_last(obe, column));
2915 }
2916 Ok((order_by, map_exprs))
2917}
2918
2919fn plan_order_by_or_distinct_expr(
2937 ecx: &ExprContext,
2938 expr: &Expr<Aug>,
2939 output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942 return Ok(HirScalarExpr::column(output_columns[i].0));
2943 }
2944
2945 if let Expr::Identifier(names) = expr {
2946 if let [name] = &names[..] {
2947 let name = normalize::column_name(name.clone());
2948 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949 if let Some((i, _)) = iter.next() {
2950 match iter.next() {
2951 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955 _ => return Ok(HirScalarExpr::column(*i)),
2956 }
2957 }
2958 }
2959 }
2960
2961 plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965 qcx: &QueryContext,
2966 table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969 for join in &table_with_joins.joins {
2970 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971 expr = new_expr;
2972 scope = new_scope;
2973 }
2974 Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978 qcx: &QueryContext,
2979 table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981 match table_factor {
2982 TableFactor::Table { name, alias } => {
2983 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984 let scope = plan_table_alias(scope, alias.as_ref())?;
2985 Ok((expr, scope))
2986 }
2987
2988 TableFactor::Function {
2989 function,
2990 alias,
2991 with_ordinality,
2992 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994 TableFactor::RowsFrom {
2995 functions,
2996 alias,
2997 with_ordinality,
2998 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000 TableFactor::Derived {
3001 lateral,
3002 subquery,
3003 alias,
3004 } => {
3005 let mut qcx = (*qcx).clone();
3006 if !lateral {
3007 for scope in &mut qcx.outer_scopes {
3011 if scope.lateral_barrier {
3012 break;
3013 }
3014 scope.items.clear();
3015 }
3016 }
3017 qcx.outer_scopes[0].lateral_barrier = true;
3018 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019 let scope = plan_table_alias(scope, alias.as_ref())?;
3020 Ok((expr, scope))
3021 }
3022
3023 TableFactor::NestedJoin { join, alias } => {
3024 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025 let scope = plan_table_alias(scope, alias.as_ref())?;
3026 Ok((expr, scope))
3027 }
3028 }
3029}
3030
3031fn plan_rows_from(
3073 qcx: &QueryContext,
3074 functions: &[Function<Aug>],
3075 alias: Option<&TableAlias>,
3076 with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078 if let [function] = functions {
3081 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3082 }
3083
3084 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3088 qcx,
3089 functions,
3090 Some(functions[0].name.full_item_name().clone()),
3091 )?;
3092
3093 let mut columns = Vec::new();
3095 let mut offset = 0;
3096 for (idx, cols) in num_cols.into_iter().enumerate() {
3098 for i in 0..cols {
3099 columns.push(offset + i);
3100 }
3101 offset += cols + 1;
3102
3103 scope.items.remove(offset - idx - 1);
3106 }
3107
3108 if with_ordinality {
3111 columns.push(offset);
3112 } else {
3113 scope.items.pop();
3114 }
3115
3116 let expr = expr.project(columns);
3117
3118 let scope = plan_table_alias(scope, alias)?;
3119 Ok((expr, scope))
3120}
3121
3122fn plan_rows_from_internal<'a>(
3145 qcx: &QueryContext,
3146 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3147 table_name: Option<FullItemName>,
3148) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3149 let mut functions = functions.into_iter();
3150 let mut num_cols = Vec::new();
3151
3152 let (mut left_expr, mut left_scope) =
3156 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3157 num_cols.push(left_scope.len() - 1);
3158 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3160 left_scope
3161 .items
3162 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3163
3164 for function in functions {
3165 let qcx = qcx.empty_derived_context();
3167 let (right_expr, mut right_scope) =
3168 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3169 num_cols.push(right_scope.len() - 1);
3170 let left_col = left_scope.len() - 1;
3171 let right_col = left_scope.len() + right_scope.len() - 1;
3172 let on = HirScalarExpr::call_binary(
3173 HirScalarExpr::column(left_col),
3174 HirScalarExpr::column(right_col),
3175 expr_func::Eq,
3176 );
3177 left_expr = left_expr
3178 .join(right_expr, on, JoinKind::FullOuter)
3179 .map(vec![HirScalarExpr::call_variadic(
3180 VariadicFunc::Coalesce,
3181 vec![
3182 HirScalarExpr::column(left_col),
3183 HirScalarExpr::column(right_col),
3184 ],
3185 )]);
3186
3187 left_expr = left_expr.project(
3190 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3193 );
3194 right_scope.items.push(left_scope.items.pop().unwrap());
3196
3197 left_scope.items.extend(right_scope.items);
3198 }
3199
3200 Ok((left_expr, left_scope, num_cols))
3201}
3202
3203fn plan_solitary_table_function(
3207 qcx: &QueryContext,
3208 function: &Function<Aug>,
3209 alias: Option<&TableAlias>,
3210 with_ordinality: bool,
3211) -> Result<(HirRelationExpr, Scope), PlanError> {
3212 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3213
3214 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3215 if single_column_function {
3216 let item = &mut scope.items[0];
3217
3218 item.from_single_column_function = true;
3221
3222 if let Some(alias) = alias {
3237 if let ScopeItem {
3238 table_name: Some(table_name),
3239 column_name,
3240 ..
3241 } = item
3242 {
3243 if table_name.item.as_str() == column_name.as_str() {
3244 *column_name = normalize::column_name(alias.name.clone());
3245 }
3246 }
3247 }
3248 }
3249
3250 let scope = plan_table_alias(scope, alias)?;
3251 Ok((expr, scope))
3252}
3253
3254fn plan_table_function_internal(
3259 qcx: &QueryContext,
3260 Function {
3261 name,
3262 args,
3263 filter,
3264 over,
3265 distinct,
3266 }: &Function<Aug>,
3267 with_ordinality: bool,
3268 table_name: Option<FullItemName>,
3269) -> Result<(HirRelationExpr, Scope), PlanError> {
3270 assert_none!(filter, "cannot parse table function with FILTER");
3271 assert_none!(over, "cannot parse table function with OVER");
3272 assert!(!*distinct, "cannot parse table function with DISTINCT");
3273
3274 let ecx = &ExprContext {
3275 qcx,
3276 name: "table function arguments",
3277 scope: &Scope::empty(),
3278 relation_type: &SqlRelationType::empty(),
3279 allow_aggregates: false,
3280 allow_subqueries: true,
3281 allow_parameters: true,
3282 allow_windows: false,
3283 };
3284
3285 let scalar_args = match args {
3286 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3287 FunctionArgs::Args { args, order_by } => {
3288 if !order_by.is_empty() {
3289 sql_bail!(
3290 "ORDER BY specified, but {} is not an aggregate function",
3291 name
3292 );
3293 }
3294 plan_exprs(ecx, args)?
3295 }
3296 };
3297
3298 let table_name = match table_name {
3299 Some(table_name) => table_name.item,
3300 None => name.full_item_name().item.clone(),
3301 };
3302
3303 let scope_name = Some(PartialItemName {
3304 database: None,
3305 schema: None,
3306 item: table_name,
3307 });
3308
3309 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3310 Func::Table(impls) => {
3311 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3312 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3313 let expr = match tf.imp {
3314 TableFuncImpl::CallTable { mut func, exprs } => {
3315 if with_ordinality {
3316 func = TableFunc::with_ordinality(func.clone()).ok_or(
3317 PlanError::Unsupported {
3318 feature: format!("WITH ORDINALITY on {}", func),
3319 discussion_no: None,
3320 },
3321 )?;
3322 }
3323 HirRelationExpr::CallTable { func, exprs }
3324 }
3325 TableFuncImpl::Expr(expr) => {
3326 if !with_ordinality {
3327 expr
3328 } else {
3329 if qcx
3333 .scx
3334 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3335 {
3336 tracing::error!(
3340 %name,
3341 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3342 );
3343 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3344 func: WindowExprType::Scalar(ScalarWindowExpr {
3345 func: ScalarWindowFunc::RowNumber,
3346 order_by: vec![],
3347 }),
3348 partition_by: vec![],
3349 order_by: vec![],
3350 })])
3351 } else {
3352 bail_unsupported!(format!(
3353 "WITH ORDINALITY or ROWS FROM with {}",
3354 name
3355 ));
3356 }
3357 }
3358 }
3359 };
3360 (expr, scope)
3361 }
3362 Func::Scalar(impls) => {
3363 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3364 let output = expr.typ(
3365 &qcx.outer_relation_types,
3366 &SqlRelationType::new(vec![]),
3367 &qcx.scx.param_types.borrow(),
3368 );
3369
3370 let relation = SqlRelationType::new(vec![output]);
3371
3372 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3373 let column_name = normalize::column_name(function_ident);
3374 let name = column_name.to_string();
3375
3376 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3377
3378 let mut func = TableFunc::TabletizedScalar { relation, name };
3379 if with_ordinality {
3380 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3381 feature: format!("WITH ORDINALITY on {}", func),
3382 discussion_no: None,
3383 })?;
3384 }
3385 (
3386 HirRelationExpr::CallTable {
3387 func,
3388 exprs: vec![expr],
3389 },
3390 scope,
3391 )
3392 }
3393 o => sql_bail!(
3394 "{} functions are not supported in functions in FROM",
3395 o.class()
3396 ),
3397 };
3398
3399 if with_ordinality {
3400 scope
3401 .items
3402 .push(ScopeItem::from_name(scope_name, "ordinality"));
3403 }
3404
3405 Ok((expr, scope))
3406}
3407
3408fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3409 if let Some(TableAlias {
3410 name,
3411 columns,
3412 strict,
3413 }) = alias
3414 {
3415 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3416 sql_bail!(
3417 "{} has {} columns available but {} columns specified",
3418 name,
3419 scope.items.len(),
3420 columns.len()
3421 );
3422 }
3423
3424 let table_name = normalize::ident(name.to_owned());
3425 for (i, item) in scope.items.iter_mut().enumerate() {
3426 item.table_name = if item.allow_unqualified_references {
3427 Some(PartialItemName {
3428 database: None,
3429 schema: None,
3430 item: table_name.clone(),
3431 })
3432 } else {
3433 None
3467 };
3468 item.column_name = columns
3469 .get(i)
3470 .map(|a| normalize::column_name(a.clone()))
3471 .unwrap_or_else(|| item.column_name.clone());
3472 }
3473 }
3474 Ok(scope)
3475}
3476
3477fn invent_column_name(
3481 ecx: &ExprContext,
3482 expr: &Expr<Aug>,
3483 table_func_names: &BTreeMap<String, Ident>,
3484) -> Option<ColumnName> {
3485 #[derive(Debug)]
3492 enum NameQuality {
3493 Low,
3494 High,
3495 }
3496
3497 fn invent(
3498 ecx: &ExprContext,
3499 expr: &Expr<Aug>,
3500 table_func_names: &BTreeMap<String, Ident>,
3501 ) -> Option<(ColumnName, NameQuality)> {
3502 match expr {
3503 Expr::Identifier(names) => {
3504 if let [name] = names.as_slice() {
3505 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3506 return Some((
3507 normalize::column_name(table_func_name.clone()),
3508 NameQuality::High,
3509 ));
3510 }
3511 }
3512 names
3513 .last()
3514 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3515 }
3516 Expr::Value(v) => match v {
3517 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3520 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3521 _ => None,
3522 },
3523 Expr::Function(func) => {
3524 let (schema, item) = match &func.name {
3525 ResolvedItemName::Item {
3526 qualifiers,
3527 full_name,
3528 ..
3529 } => (&qualifiers.schema_spec, full_name.item.clone()),
3530 _ => unreachable!(),
3531 };
3532
3533 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3534 || schema
3535 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3536 {
3537 None
3538 } else {
3539 Some((item.into(), NameQuality::High))
3540 }
3541 }
3542 Expr::HomogenizingFunction { function, .. } => Some((
3543 function.to_string().to_lowercase().into(),
3544 NameQuality::High,
3545 )),
3546 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3547 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3548 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3549 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3550 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3551 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3552 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3553 },
3554 Expr::Case { else_result, .. } => {
3555 match else_result
3556 .as_ref()
3557 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3558 {
3559 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560 _ => Some(("case".into(), NameQuality::Low)),
3561 }
3562 }
3563 Expr::FieldAccess { field, .. } => {
3564 Some((normalize::column_name(field.clone()), NameQuality::High))
3565 }
3566 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3567 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3568 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3569 let (_expr, scope) =
3573 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3574 scope
3575 .items
3576 .first()
3577 .map(|name| (name.column_name.clone(), NameQuality::High))
3578 }
3579 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3580 _ => None,
3581 }
3582 }
3583
3584 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3585}
3586
3587#[derive(Debug)]
3588enum ExpandedSelectItem<'a> {
3589 InputOrdinal(usize),
3590 Expr(Cow<'a, Expr<Aug>>),
3591}
3592
3593impl ExpandedSelectItem<'_> {
3594 fn as_expr(&self) -> Option<&Expr<Aug>> {
3595 match self {
3596 ExpandedSelectItem::InputOrdinal(_) => None,
3597 ExpandedSelectItem::Expr(expr) => Some(expr),
3598 }
3599 }
3600}
3601
3602fn expand_select_item<'a>(
3603 ecx: &ExprContext,
3604 s: &'a SelectItem<Aug>,
3605 table_func_names: &BTreeMap<String, Ident>,
3606) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3607 match s {
3608 SelectItem::Expr {
3609 expr: Expr::QualifiedWildcard(table_name),
3610 alias: _,
3611 } => {
3612 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3613 let table_name =
3614 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3615 let out: Vec<_> = ecx
3616 .scope
3617 .items
3618 .iter()
3619 .enumerate()
3620 .filter(|(_i, item)| item.is_from_table(&table_name))
3621 .map(|(i, item)| {
3622 let name = item.column_name.clone();
3623 (ExpandedSelectItem::InputOrdinal(i), name)
3624 })
3625 .collect();
3626 if out.is_empty() {
3627 sql_bail!("no table named '{}' in scope", table_name);
3628 }
3629 Ok(out)
3630 }
3631 SelectItem::Expr {
3632 expr: Expr::WildcardAccess(sql_expr),
3633 alias: _,
3634 } => {
3635 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3642 let fields = match ecx.scalar_type(&expr) {
3643 SqlScalarType::Record { fields, .. } => fields,
3644 ty => sql_bail!(
3645 "type {} is not composite",
3646 ecx.humanize_scalar_type(&ty, false)
3647 ),
3648 };
3649 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3650 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3651 if let [name] = ident.as_slice() {
3652 if let Ok(items) = ecx.scope.items_from_table(
3653 &[],
3654 &PartialItemName {
3655 database: None,
3656 schema: None,
3657 item: name.as_str().to_string(),
3658 },
3659 ) {
3660 for (_, item) in items {
3661 if item
3662 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3663 {
3664 skip_cols.insert(item.column_name.clone());
3665 }
3666 }
3667 }
3668 }
3669 }
3670 let items = fields
3671 .iter()
3672 .filter_map(|(name, _ty)| {
3673 if skip_cols.contains(name) {
3674 None
3675 } else {
3676 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3677 expr: sql_expr.clone(),
3678 field: name.clone().into(),
3679 }));
3680 Some((item, name.clone()))
3681 }
3682 })
3683 .collect();
3684 Ok(items)
3685 }
3686 SelectItem::Wildcard => {
3687 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3688 let items: Vec<_> = ecx
3689 .scope
3690 .items
3691 .iter()
3692 .enumerate()
3693 .filter(|(_i, item)| item.allow_unqualified_references)
3694 .map(|(i, item)| {
3695 let name = item.column_name.clone();
3696 (ExpandedSelectItem::InputOrdinal(i), name)
3697 })
3698 .collect();
3699
3700 Ok(items)
3701 }
3702 SelectItem::Expr { expr, alias } => {
3703 let name = alias
3704 .clone()
3705 .map(normalize::column_name)
3706 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3707 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3708 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3709 }
3710 }
3711}
3712
3713fn plan_join(
3714 left_qcx: &QueryContext,
3715 left: HirRelationExpr,
3716 left_scope: Scope,
3717 join: &Join<Aug>,
3718) -> Result<(HirRelationExpr, Scope), PlanError> {
3719 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3720 let (kind, constraint) = match &join.join_operator {
3721 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3722 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3723 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3724 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3725 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3726 };
3727
3728 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3729 if !kind.can_be_correlated() {
3730 for item in &mut right_qcx.outer_scopes[0].items {
3731 item.error_if_referenced =
3736 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3737 table: table.cloned(),
3738 column: column.clone(),
3739 });
3740 }
3741 }
3742 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3743
3744 let (expr, scope) = match constraint {
3745 JoinConstraint::On(expr) => {
3746 let product_scope = left_scope.product(right_scope)?;
3747 let ecx = &ExprContext {
3748 qcx: left_qcx,
3749 name: "ON clause",
3750 scope: &product_scope,
3751 relation_type: &SqlRelationType::new(
3752 left_qcx
3753 .relation_type(&left)
3754 .column_types
3755 .into_iter()
3756 .chain(right_qcx.relation_type(&right).column_types)
3757 .collect(),
3758 ),
3759 allow_aggregates: false,
3760 allow_subqueries: true,
3761 allow_parameters: true,
3762 allow_windows: false,
3763 };
3764 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3765 let joined = left.join(right, on, kind);
3766 (joined, product_scope)
3767 }
3768 JoinConstraint::Using { columns, alias } => {
3769 let column_names = columns
3770 .iter()
3771 .map(|ident| normalize::column_name(ident.clone()))
3772 .collect::<Vec<_>>();
3773
3774 plan_using_constraint(
3775 &column_names,
3776 left_qcx,
3777 left,
3778 left_scope,
3779 &right_qcx,
3780 right,
3781 right_scope,
3782 kind,
3783 alias.as_ref(),
3784 )?
3785 }
3786 JoinConstraint::Natural => {
3787 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3790 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3791 let left_column_names = left_scope.column_names();
3792 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3793 let column_names: Vec<_> = left_column_names
3794 .filter(|col| right_column_names.contains(col))
3795 .cloned()
3796 .collect();
3797 plan_using_constraint(
3798 &column_names,
3799 left_qcx,
3800 left,
3801 left_scope,
3802 &right_qcx,
3803 right,
3804 right_scope,
3805 kind,
3806 None,
3807 )?
3808 }
3809 };
3810 Ok((expr, scope))
3811}
3812
3813#[allow(clippy::too_many_arguments)]
3815fn plan_using_constraint(
3816 column_names: &[ColumnName],
3817 left_qcx: &QueryContext,
3818 left: HirRelationExpr,
3819 left_scope: Scope,
3820 right_qcx: &QueryContext,
3821 right: HirRelationExpr,
3822 right_scope: Scope,
3823 kind: JoinKind,
3824 alias: Option<&Ident>,
3825) -> Result<(HirRelationExpr, Scope), PlanError> {
3826 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3827
3828 let mut unique_column_names = BTreeSet::new();
3831 for c in column_names {
3832 if !unique_column_names.insert(c) {
3833 return Err(PlanError::Unsupported {
3834 feature: format!(
3835 "column name {} appears more than once in USING clause",
3836 c.quoted()
3837 ),
3838 discussion_no: None,
3839 });
3840 }
3841 }
3842
3843 let alias_item_name = alias.map(|alias| PartialItemName {
3844 database: None,
3845 schema: None,
3846 item: alias.clone().to_string(),
3847 });
3848
3849 if let Some(alias_item_name) = &alias_item_name {
3850 for partial_item_name in both_scope.table_names() {
3851 if partial_item_name.matches(alias_item_name) {
3852 sql_bail!(
3853 "table name \"{}\" specified more than once",
3854 alias_item_name
3855 )
3856 }
3857 }
3858 }
3859
3860 let ecx = &ExprContext {
3861 qcx: right_qcx,
3862 name: "USING clause",
3863 scope: &both_scope,
3864 relation_type: &SqlRelationType::new(
3865 left_qcx
3866 .relation_type(&left)
3867 .column_types
3868 .into_iter()
3869 .chain(right_qcx.relation_type(&right).column_types)
3870 .collect(),
3871 ),
3872 allow_aggregates: false,
3873 allow_subqueries: false,
3874 allow_parameters: false,
3875 allow_windows: false,
3876 };
3877
3878 let mut join_exprs = vec![];
3879 let mut map_exprs = vec![];
3880 let mut new_items = vec![];
3881 let mut join_cols = vec![];
3882 let mut hidden_cols = vec![];
3883
3884 for column_name in column_names {
3885 let (lhs, lhs_name) = left_scope.resolve_using_column(
3887 column_name,
3888 JoinSide::Left,
3889 &mut left_qcx.name_manager.borrow_mut(),
3890 )?;
3891 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3892 column_name,
3893 JoinSide::Right,
3894 &mut right_qcx.name_manager.borrow_mut(),
3895 )?;
3896
3897 rhs.column += left_scope.len();
3899
3900 let mut exprs = coerce_homogeneous_exprs(
3902 &ecx.with_name(&format!(
3903 "NATURAL/USING join column {}",
3904 column_name.quoted()
3905 )),
3906 vec![
3907 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908 lhs,
3909 Arc::clone(&lhs_name),
3910 )),
3911 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3912 rhs,
3913 Arc::clone(&rhs_name),
3914 )),
3915 ],
3916 None,
3917 )?;
3918 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3919
3920 match kind {
3921 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3922 join_cols.push(lhs.column);
3923 hidden_cols.push(rhs.column);
3924 }
3925 JoinKind::RightOuter => {
3926 join_cols.push(rhs.column);
3927 hidden_cols.push(lhs.column);
3928 }
3929 JoinKind::FullOuter => {
3930 join_cols.push(both_scope.items.len() + map_exprs.len());
3933 hidden_cols.push(lhs.column);
3934 hidden_cols.push(rhs.column);
3935 map_exprs.push(HirScalarExpr::call_variadic(
3936 VariadicFunc::Coalesce,
3937 vec![expr1.clone(), expr2.clone()],
3938 ));
3939 new_items.push(ScopeItem::from_column_name(column_name));
3940 }
3941 }
3942
3943 if alias_item_name.is_some() {
3948 let new_item_col = both_scope.items.len() + new_items.len();
3949 join_cols.push(new_item_col);
3950 hidden_cols.push(new_item_col);
3951
3952 new_items.push(ScopeItem::from_name(
3953 alias_item_name.clone(),
3954 column_name.clone().to_string(),
3955 ));
3956
3957 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3961 }
3962
3963 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3964 }
3965 both_scope.items.extend(new_items);
3966
3967 for c in hidden_cols {
3971 both_scope.items[c].allow_unqualified_references = false;
3972 }
3973
3974 let project_key = join_cols
3976 .into_iter()
3977 .chain(0..both_scope.items.len())
3978 .unique()
3979 .collect::<Vec<_>>();
3980
3981 both_scope = both_scope.project(&project_key);
3982
3983 let on = HirScalarExpr::variadic_and(join_exprs);
3984
3985 let both = left
3986 .join(right, on, kind)
3987 .map(map_exprs)
3988 .project(project_key);
3989 Ok((both, both_scope))
3990}
3991
3992pub fn plan_expr<'a>(
3993 ecx: &'a ExprContext,
3994 e: &Expr<Aug>,
3995) -> Result<CoercibleScalarExpr, PlanError> {
3996 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3997}
3998
3999fn plan_expr_inner<'a>(
4000 ecx: &'a ExprContext,
4001 e: &Expr<Aug>,
4002) -> Result<CoercibleScalarExpr, PlanError> {
4003 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4004 return Ok(HirScalarExpr::named_column(
4006 i,
4007 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4008 )
4009 .into());
4010 }
4011
4012 match e {
4013 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4015 Ok(plan_identifier(ecx, names)?.into())
4016 }
4017
4018 Expr::Value(val) => plan_literal(val),
4020 Expr::Parameter(n) => plan_parameter(ecx, *n),
4021 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4022 Expr::List(exprs) => plan_list(ecx, exprs, None),
4023 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4024 Expr::Row { exprs } => plan_row(ecx, exprs),
4025
4026 Expr::Op { op, expr1, expr2 } => {
4028 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4029 }
4030 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4031 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4032
4033 Expr::Not { expr } => plan_not(ecx, expr),
4035 Expr::And { left, right } => plan_and(ecx, left, right),
4036 Expr::Or { left, right } => plan_or(ecx, left, right),
4037 Expr::IsExpr {
4038 expr,
4039 construct,
4040 negated,
4041 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4042 Expr::Case {
4043 operand,
4044 conditions,
4045 results,
4046 else_result,
4047 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4048 Expr::HomogenizingFunction { function, exprs } => {
4049 plan_homogenizing_function(ecx, function, exprs)
4050 }
4051 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4052 ecx,
4053 &None,
4054 &[l_expr.clone().equals(*r_expr.clone())],
4055 &[Expr::null()],
4056 &Some(Box::new(*l_expr.clone())),
4057 )?
4058 .into()),
4059 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4060 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4061 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4062 Expr::Like {
4063 expr,
4064 pattern,
4065 escape,
4066 case_insensitive,
4067 negated,
4068 } => Ok(plan_like(
4069 ecx,
4070 expr,
4071 pattern,
4072 escape.as_deref(),
4073 *case_insensitive,
4074 *negated,
4075 )?
4076 .into()),
4077
4078 Expr::InList {
4079 expr,
4080 list,
4081 negated,
4082 } => plan_in_list(ecx, expr, list, negated),
4083
4084 Expr::Exists(query) => plan_exists(ecx, query),
4086 Expr::Subquery(query) => plan_subquery(ecx, query),
4087 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4088 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4089 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4090 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4091 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4092 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4093 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4094 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4095 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4096 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4097 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4098 }
4099}
4100
4101fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4102 if !ecx.allow_parameters {
4103 return Err(PlanError::UnknownParameter(n));
4107 }
4108 if n == 0 || n > 65536 {
4109 return Err(PlanError::UnknownParameter(n));
4110 }
4111 if ecx.param_types().borrow().contains_key(&n) {
4112 Ok(HirScalarExpr::parameter(n).into())
4113 } else {
4114 Ok(CoercibleScalarExpr::Parameter(n))
4115 }
4116}
4117
4118fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4119 let mut out = vec![];
4120 for e in exprs {
4121 out.push(plan_expr(ecx, e)?);
4122 }
4123 Ok(CoercibleScalarExpr::LiteralRecord(out))
4124}
4125
4126fn plan_cast(
4127 ecx: &ExprContext,
4128 expr: &Expr<Aug>,
4129 data_type: &ResolvedDataType,
4130) -> Result<CoercibleScalarExpr, PlanError> {
4131 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4132 let expr = match expr {
4133 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4142 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4143 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4144 _ => plan_expr(ecx, expr)?,
4145 };
4146 let ecx = &ecx.with_name("CAST");
4147 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4148 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4149 Ok(expr.into())
4150}
4151
4152fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4153 let ecx = ecx.with_name("NOT argument");
4154 Ok(plan_expr(&ecx, expr)?
4155 .type_as(&ecx, &SqlScalarType::Bool)?
4156 .call_unary(UnaryFunc::Not(expr_func::Not))
4157 .into())
4158}
4159
4160fn plan_and(
4161 ecx: &ExprContext,
4162 left: &Expr<Aug>,
4163 right: &Expr<Aug>,
4164) -> Result<CoercibleScalarExpr, PlanError> {
4165 let ecx = ecx.with_name("AND argument");
4166 Ok(HirScalarExpr::variadic_and(vec![
4167 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4168 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4169 ])
4170 .into())
4171}
4172
4173fn plan_or(
4174 ecx: &ExprContext,
4175 left: &Expr<Aug>,
4176 right: &Expr<Aug>,
4177) -> Result<CoercibleScalarExpr, PlanError> {
4178 let ecx = ecx.with_name("OR argument");
4179 Ok(HirScalarExpr::variadic_or(vec![
4180 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4181 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4182 ])
4183 .into())
4184}
4185
4186fn plan_in_list(
4187 ecx: &ExprContext,
4188 lhs: &Expr<Aug>,
4189 list: &Vec<Expr<Aug>>,
4190 negated: &bool,
4191) -> Result<CoercibleScalarExpr, PlanError> {
4192 let ecx = ecx.with_name("IN list");
4193 let or = HirScalarExpr::variadic_or(
4194 list.into_iter()
4195 .map(|e| {
4196 let eq = lhs.clone().equals(e.clone());
4197 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4198 })
4199 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4200 );
4201 Ok(if *negated {
4202 or.call_unary(UnaryFunc::Not(expr_func::Not))
4203 } else {
4204 or
4205 }
4206 .into())
4207}
4208
4209fn plan_homogenizing_function(
4210 ecx: &ExprContext,
4211 function: &HomogenizingFunction,
4212 exprs: &[Expr<Aug>],
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4216 match function {
4217 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4218 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4219 HomogenizingFunction::Least => VariadicFunc::Least,
4220 },
4221 coerce_homogeneous_exprs(
4222 &ecx.with_name(&function.to_string().to_lowercase()),
4223 plan_exprs(ecx, exprs)?,
4224 None,
4225 )?,
4226 );
4227 Ok(expr.into())
4228}
4229
4230fn plan_field_access(
4231 ecx: &ExprContext,
4232 expr: &Expr<Aug>,
4233 field: &Ident,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235 let field = normalize::column_name(field.clone());
4236 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4237 let ty = ecx.scalar_type(&expr);
4238 let i = match &ty {
4239 SqlScalarType::Record { fields, .. } => {
4240 fields.iter().position(|(name, _ty)| *name == field)
4241 }
4242 ty => sql_bail!(
4243 "column notation applied to type {}, which is not a composite type",
4244 ecx.humanize_scalar_type(ty, false)
4245 ),
4246 };
4247 match i {
4248 None => sql_bail!(
4249 "field {} not found in data type {}",
4250 field,
4251 ecx.humanize_scalar_type(&ty, false)
4252 ),
4253 Some(i) => Ok(expr
4254 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4255 .into()),
4256 }
4257}
4258
4259fn plan_subscript(
4260 ecx: &ExprContext,
4261 expr: &Expr<Aug>,
4262 positions: &[SubscriptPosition<Aug>],
4263) -> Result<CoercibleScalarExpr, PlanError> {
4264 assert!(
4265 !positions.is_empty(),
4266 "subscript expression must contain at least one position"
4267 );
4268
4269 let ecx = &ecx.with_name("subscripting");
4270 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4271 let ty = ecx.scalar_type(&expr);
4272 match &ty {
4273 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4274 ecx,
4275 expr,
4276 positions,
4277 if ty == SqlScalarType::Int2Vector {
4281 1
4282 } else {
4283 0
4284 },
4285 ),
4286 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4287 SqlScalarType::List { element_type, .. } => {
4288 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4290 let n_layers = ty.unwrap_list_n_layers();
4291 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4292 }
4293 ty => sql_bail!(
4294 "cannot subscript type {}",
4295 ecx.humanize_scalar_type(ty, false)
4296 ),
4297 }
4298}
4299
4300fn extract_scalar_subscript_from_positions<'a>(
4304 positions: &'a [SubscriptPosition<Aug>],
4305 expr_type_name: &str,
4306) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4307 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4308 for p in positions {
4309 if p.explicit_slice {
4310 sql_bail!("{} subscript does not support slices", expr_type_name);
4311 }
4312 assert!(
4313 p.end.is_none(),
4314 "index-appearing subscripts cannot have end value"
4315 );
4316 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4317 }
4318 Ok(scalar_subscripts)
4319}
4320
4321fn plan_subscript_array(
4322 ecx: &ExprContext,
4323 expr: HirScalarExpr,
4324 positions: &[SubscriptPosition<Aug>],
4325 offset: i64,
4326) -> Result<CoercibleScalarExpr, PlanError> {
4327 let mut exprs = Vec::with_capacity(positions.len() + 1);
4328 exprs.push(expr);
4329
4330 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4333
4334 for i in indexes {
4335 exprs.push(plan_expr(ecx, i)?.cast_to(
4336 ecx,
4337 CastContext::Explicit,
4338 &SqlScalarType::Int64,
4339 )?);
4340 }
4341
4342 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4343}
4344
4345fn plan_subscript_list(
4346 ecx: &ExprContext,
4347 mut expr: HirScalarExpr,
4348 positions: &[SubscriptPosition<Aug>],
4349 mut remaining_layers: usize,
4350 elem_type_name: &str,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352 let mut i = 0;
4353
4354 while i < positions.len() {
4355 let j = positions[i..]
4357 .iter()
4358 .position(|p| p.explicit_slice)
4359 .unwrap_or(positions.len() - i);
4360 if j != 0 {
4361 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4362 let (n, e) = plan_index_list(
4363 ecx,
4364 expr,
4365 indexes.as_slice(),
4366 remaining_layers,
4367 elem_type_name,
4368 )?;
4369 remaining_layers = n;
4370 expr = e;
4371 i += j;
4372 }
4373
4374 let j = positions[i..]
4376 .iter()
4377 .position(|p| !p.explicit_slice)
4378 .unwrap_or(positions.len() - i);
4379 if j != 0 {
4380 expr = plan_slice_list(
4381 ecx,
4382 expr,
4383 &positions[i..i + j],
4384 remaining_layers,
4385 elem_type_name,
4386 )?;
4387 i += j;
4388 }
4389 }
4390
4391 Ok(expr.into())
4392}
4393
4394fn plan_index_list(
4395 ecx: &ExprContext,
4396 expr: HirScalarExpr,
4397 indexes: &[&Expr<Aug>],
4398 n_layers: usize,
4399 elem_type_name: &str,
4400) -> Result<(usize, HirScalarExpr), PlanError> {
4401 let depth = indexes.len();
4402
4403 if depth > n_layers {
4404 if n_layers == 0 {
4405 sql_bail!("cannot subscript type {}", elem_type_name)
4406 } else {
4407 sql_bail!(
4408 "cannot index into {} layers; list only has {} layer{}",
4409 depth,
4410 n_layers,
4411 if n_layers == 1 { "" } else { "s" }
4412 )
4413 }
4414 }
4415
4416 let mut exprs = Vec::with_capacity(depth + 1);
4417 exprs.push(expr);
4418
4419 for i in indexes {
4420 exprs.push(plan_expr(ecx, i)?.cast_to(
4421 ecx,
4422 CastContext::Explicit,
4423 &SqlScalarType::Int64,
4424 )?);
4425 }
4426
4427 Ok((
4428 n_layers - depth,
4429 HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4430 ))
4431}
4432
4433fn plan_slice_list(
4434 ecx: &ExprContext,
4435 expr: HirScalarExpr,
4436 slices: &[SubscriptPosition<Aug>],
4437 n_layers: usize,
4438 elem_type_name: &str,
4439) -> Result<HirScalarExpr, PlanError> {
4440 if n_layers == 0 {
4441 sql_bail!("cannot subscript type {}", elem_type_name)
4442 }
4443
4444 let mut exprs = Vec::with_capacity(slices.len() + 1);
4446 exprs.push(expr);
4447 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4449 Ok(match position {
4450 Some(p) => {
4451 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4452 }
4453 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4454 })
4455 };
4456 for p in slices {
4457 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4458 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4459 exprs.push(start);
4460 exprs.push(end);
4461 }
4462
4463 Ok(HirScalarExpr::call_variadic(
4464 VariadicFunc::ListSliceLinear,
4465 exprs,
4466 ))
4467}
4468
4469fn plan_like(
4470 ecx: &ExprContext,
4471 expr: &Expr<Aug>,
4472 pattern: &Expr<Aug>,
4473 escape: Option<&Expr<Aug>>,
4474 case_insensitive: bool,
4475 not: bool,
4476) -> Result<HirScalarExpr, PlanError> {
4477 use CastContext::Implicit;
4478 let ecx = ecx.with_name("LIKE argument");
4479 let expr = plan_expr(&ecx, expr)?;
4480 let haystack = match ecx.scalar_type(&expr) {
4481 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4482 .type_as(&ecx, ty)?
4483 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4484 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4485 };
4486 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4487 if let Some(escape) = escape {
4488 pattern = pattern.call_binary(
4489 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4490 expr_func::LikeEscape,
4491 );
4492 }
4493 let func: BinaryFunc = if case_insensitive {
4494 expr_func::IsLikeMatchCaseInsensitive.into()
4495 } else {
4496 expr_func::IsLikeMatchCaseSensitive.into()
4497 };
4498 let like = haystack.call_binary(pattern, func);
4499 if not {
4500 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4501 } else {
4502 Ok(like)
4503 }
4504}
4505
4506fn plan_subscript_jsonb(
4507 ecx: &ExprContext,
4508 expr: HirScalarExpr,
4509 positions: &[SubscriptPosition<Aug>],
4510) -> Result<CoercibleScalarExpr, PlanError> {
4511 use CastContext::Implicit;
4512 use SqlScalarType::{Int64, String};
4513
4514 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4517
4518 let mut exprs = Vec::with_capacity(subscripts.len());
4519 for s in subscripts {
4520 let subscript = plan_expr(ecx, s)?;
4521 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4522 subscript
4523 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4524 typeconv::to_string(ecx, subscript)
4528 } else {
4529 sql_bail!("jsonb subscript type must be coercible to integer or text");
4530 };
4531 exprs.push(subscript);
4532 }
4533
4534 let expr = expr.call_binary(
4537 HirScalarExpr::call_variadic(
4538 VariadicFunc::ArrayCreate {
4539 elem_type: SqlScalarType::String,
4540 },
4541 exprs,
4542 ),
4543 BinaryFunc::JsonbGetPath,
4544 );
4545 Ok(expr.into())
4546}
4547
4548fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4549 if !ecx.allow_subqueries {
4550 sql_bail!("{} does not allow subqueries", ecx.name)
4551 }
4552 let mut qcx = ecx.derived_query_context();
4553 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4554 Ok(expr.exists().into())
4555}
4556
4557fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4558 if !ecx.allow_subqueries {
4559 sql_bail!("{} does not allow subqueries", ecx.name)
4560 }
4561 let mut qcx = ecx.derived_query_context();
4562 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4563 let column_types = qcx.relation_type(&expr).column_types;
4564 if column_types.len() != 1 {
4565 sql_bail!(
4566 "Expected subselect to return 1 column, got {} columns",
4567 column_types.len()
4568 );
4569 }
4570 Ok(expr.select().into())
4571}
4572
4573fn plan_list_subquery(
4574 ecx: &ExprContext,
4575 query: &Query<Aug>,
4576) -> Result<CoercibleScalarExpr, PlanError> {
4577 plan_vector_like_subquery(
4578 ecx,
4579 query,
4580 |_| false,
4581 |elem_type| VariadicFunc::ListCreate { elem_type },
4582 |order_by| AggregateFunc::ListConcat { order_by },
4583 expr_func::ListListConcat.into(),
4584 |elem_type| {
4585 HirScalarExpr::literal(
4586 Datum::empty_list(),
4587 SqlScalarType::List {
4588 element_type: Box::new(elem_type),
4589 custom_id: None,
4590 },
4591 )
4592 },
4593 "list",
4594 )
4595}
4596
4597fn plan_array_subquery(
4598 ecx: &ExprContext,
4599 query: &Query<Aug>,
4600) -> Result<CoercibleScalarExpr, PlanError> {
4601 plan_vector_like_subquery(
4602 ecx,
4603 query,
4604 |elem_type| {
4605 matches!(
4606 elem_type,
4607 SqlScalarType::Char { .. }
4608 | SqlScalarType::Array { .. }
4609 | SqlScalarType::List { .. }
4610 | SqlScalarType::Map { .. }
4611 )
4612 },
4613 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4614 |order_by| AggregateFunc::ArrayConcat { order_by },
4615 expr_func::ArrayArrayConcat.into(),
4616 |elem_type| {
4617 HirScalarExpr::literal(
4618 Datum::empty_array(),
4619 SqlScalarType::Array(Box::new(elem_type)),
4620 )
4621 },
4622 "[]",
4623 )
4624}
4625
4626fn plan_vector_like_subquery<F1, F2, F3, F4>(
4628 ecx: &ExprContext,
4629 query: &Query<Aug>,
4630 is_unsupported_type: F1,
4631 vector_create: F2,
4632 aggregate_concat: F3,
4633 binary_concat: BinaryFunc,
4634 empty_literal: F4,
4635 vector_type_string: &str,
4636) -> Result<CoercibleScalarExpr, PlanError>
4637where
4638 F1: Fn(&SqlScalarType) -> bool,
4639 F2: Fn(SqlScalarType) -> VariadicFunc,
4640 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4641 F4: Fn(SqlScalarType) -> HirScalarExpr,
4642{
4643 if !ecx.allow_subqueries {
4644 sql_bail!("{} does not allow subqueries", ecx.name)
4645 }
4646
4647 let mut qcx = ecx.derived_query_context();
4648 let mut planned_query = plan_query(&mut qcx, query)?;
4649 if planned_query.limit.is_some()
4650 || !planned_query
4651 .offset
4652 .clone()
4653 .try_into_literal_int64()
4654 .is_ok_and(|offset| offset == 0)
4655 {
4656 planned_query.expr = HirRelationExpr::top_k(
4657 planned_query.expr,
4658 vec![],
4659 planned_query.order_by.clone(),
4660 planned_query.limit,
4661 planned_query.offset,
4662 planned_query.group_size_hints.limit_input_group_size,
4663 );
4664 }
4665
4666 if planned_query.project.len() != 1 {
4667 sql_bail!(
4668 "Expected subselect to return 1 column, got {} columns",
4669 planned_query.project.len()
4670 );
4671 }
4672
4673 let project_column = *planned_query.project.get(0).unwrap();
4674 let elem_type = qcx
4675 .relation_type(&planned_query.expr)
4676 .column_types
4677 .get(project_column)
4678 .cloned()
4679 .unwrap()
4680 .scalar_type();
4681
4682 if is_unsupported_type(&elem_type) {
4683 bail_unsupported!(format!(
4684 "cannot build array from subquery because return type {}{}",
4685 ecx.humanize_scalar_type(&elem_type, false),
4686 vector_type_string
4687 ));
4688 }
4689
4690 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4693 vector_create(elem_type.clone()),
4694 vec![HirScalarExpr::column(project_column)],
4695 ))
4696 .chain(
4697 planned_query
4698 .order_by
4699 .iter()
4700 .map(|co| HirScalarExpr::column(co.column)),
4701 )
4702 .collect();
4703
4704 let aggregation_projection = vec![0];
4708 let aggregation_order_by = planned_query
4709 .order_by
4710 .into_iter()
4711 .enumerate()
4712 .map(|(i, order)| ColumnOrder { column: i, ..order })
4713 .collect();
4714
4715 let reduced_expr = planned_query
4716 .expr
4717 .reduce(
4718 vec![],
4719 vec![AggregateExpr {
4720 func: aggregate_concat(aggregation_order_by),
4721 expr: Box::new(HirScalarExpr::call_variadic(
4722 VariadicFunc::RecordCreate {
4723 field_names: iter::repeat(ColumnName::from(""))
4724 .take(aggregation_exprs.len())
4725 .collect(),
4726 },
4727 aggregation_exprs,
4728 )),
4729 distinct: false,
4730 }],
4731 None,
4732 )
4733 .project(aggregation_projection);
4734
4735 Ok(reduced_expr
4737 .select()
4738 .call_binary(empty_literal(elem_type), binary_concat)
4739 .into())
4740}
4741
4742fn plan_map_subquery(
4743 ecx: &ExprContext,
4744 query: &Query<Aug>,
4745) -> Result<CoercibleScalarExpr, PlanError> {
4746 if !ecx.allow_subqueries {
4747 sql_bail!("{} does not allow subqueries", ecx.name)
4748 }
4749
4750 let mut qcx = ecx.derived_query_context();
4751 let mut query = plan_query(&mut qcx, query)?;
4752 if query.limit.is_some()
4753 || !query
4754 .offset
4755 .clone()
4756 .try_into_literal_int64()
4757 .is_ok_and(|offset| offset == 0)
4758 {
4759 query.expr = HirRelationExpr::top_k(
4760 query.expr,
4761 vec![],
4762 query.order_by.clone(),
4763 query.limit,
4764 query.offset,
4765 query.group_size_hints.limit_input_group_size,
4766 );
4767 }
4768 if query.project.len() != 2 {
4769 sql_bail!(
4770 "expected map subquery to return 2 columns, got {} columns",
4771 query.project.len()
4772 );
4773 }
4774
4775 let query_types = qcx.relation_type(&query.expr).column_types;
4776 let key_column = query.project[0];
4777 let key_type = query_types[key_column].clone().scalar_type();
4778 let value_column = query.project[1];
4779 let value_type = query_types[value_column].clone().scalar_type();
4780
4781 if key_type != SqlScalarType::String {
4782 sql_bail!("cannot build map from subquery because first column is not of type text");
4783 }
4784
4785 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4786 VariadicFunc::RecordCreate {
4787 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4788 },
4789 vec![
4790 HirScalarExpr::column(key_column),
4791 HirScalarExpr::column(value_column),
4792 ],
4793 ))
4794 .chain(
4795 query
4796 .order_by
4797 .iter()
4798 .map(|co| HirScalarExpr::column(co.column)),
4799 )
4800 .collect();
4801
4802 let expr = query
4803 .expr
4804 .reduce(
4805 vec![],
4806 vec![AggregateExpr {
4807 func: AggregateFunc::MapAgg {
4808 order_by: query
4809 .order_by
4810 .into_iter()
4811 .enumerate()
4812 .map(|(i, order)| ColumnOrder { column: i, ..order })
4813 .collect(),
4814 value_type: value_type.clone(),
4815 },
4816 expr: Box::new(HirScalarExpr::call_variadic(
4817 VariadicFunc::RecordCreate {
4818 field_names: iter::repeat(ColumnName::from(""))
4819 .take(aggregation_exprs.len())
4820 .collect(),
4821 },
4822 aggregation_exprs,
4823 )),
4824 distinct: false,
4825 }],
4826 None,
4827 )
4828 .project(vec![0]);
4829
4830 let expr = HirScalarExpr::call_variadic(
4832 VariadicFunc::Coalesce,
4833 vec![
4834 expr.select(),
4835 HirScalarExpr::literal(
4836 Datum::empty_map(),
4837 SqlScalarType::Map {
4838 value_type: Box::new(value_type),
4839 custom_id: None,
4840 },
4841 ),
4842 ],
4843 );
4844
4845 Ok(expr.into())
4846}
4847
4848fn plan_collate(
4849 ecx: &ExprContext,
4850 expr: &Expr<Aug>,
4851 collation: &UnresolvedItemName,
4852) -> Result<CoercibleScalarExpr, PlanError> {
4853 if collation.0.len() == 2
4854 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4855 && collation.0[1] == ident!("default")
4856 {
4857 plan_expr(ecx, expr)
4858 } else {
4859 bail_unsupported!("COLLATE");
4860 }
4861}
4862
4863fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4870where
4871 E: std::borrow::Borrow<Expr<Aug>>,
4872{
4873 let mut out = vec![];
4874 for expr in exprs {
4875 out.push(plan_expr(ecx, expr.borrow())?);
4876 }
4877 Ok(out)
4878}
4879
4880fn plan_array(
4882 ecx: &ExprContext,
4883 exprs: &[Expr<Aug>],
4884 type_hint: Option<&SqlScalarType>,
4885) -> Result<CoercibleScalarExpr, PlanError> {
4886 let mut out = vec![];
4888 for expr in exprs {
4889 out.push(match expr {
4890 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4893 _ => plan_expr(ecx, expr)?,
4894 });
4895 }
4896
4897 let type_hint = match type_hint {
4899 Some(SqlScalarType::Array(elem_type)) => {
4904 let multidimensional = out.iter().any(|e| {
4905 matches!(
4906 ecx.scalar_type(e),
4907 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4908 )
4909 });
4910 if multidimensional {
4911 type_hint
4912 } else {
4913 Some(&**elem_type)
4914 }
4915 }
4916 Some(_) => None,
4920 None => None,
4922 };
4923
4924 let (elem_type, exprs) = if exprs.is_empty() {
4926 if let Some(elem_type) = type_hint {
4927 (elem_type.clone(), vec![])
4928 } else {
4929 sql_bail!("cannot determine type of empty array");
4930 }
4931 } else {
4932 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4933 (ecx.scalar_type(&out[0]), out)
4934 };
4935
4936 if matches!(
4942 elem_type,
4943 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4944 ) {
4945 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4946 }
4947
4948 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4949}
4950
4951fn plan_list(
4952 ecx: &ExprContext,
4953 exprs: &[Expr<Aug>],
4954 type_hint: Option<&SqlScalarType>,
4955) -> Result<CoercibleScalarExpr, PlanError> {
4956 let (elem_type, exprs) = if exprs.is_empty() {
4957 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4958 (element_type.without_modifiers(), vec![])
4959 } else {
4960 sql_bail!("cannot determine type of empty list");
4961 }
4962 } else {
4963 let type_hint = match type_hint {
4964 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4965 _ => None,
4966 };
4967
4968 let mut out = vec![];
4969 for expr in exprs {
4970 out.push(match expr {
4971 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4974 _ => plan_expr(ecx, expr)?,
4975 });
4976 }
4977 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4978 (ecx.scalar_type(&out[0]).without_modifiers(), out)
4979 };
4980
4981 if matches!(elem_type, SqlScalarType::Char { .. }) {
4982 bail_unsupported!("char list");
4983 }
4984
4985 Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4986}
4987
4988fn plan_map(
4989 ecx: &ExprContext,
4990 entries: &[MapEntry<Aug>],
4991 type_hint: Option<&SqlScalarType>,
4992) -> Result<CoercibleScalarExpr, PlanError> {
4993 let (value_type, exprs) = if entries.is_empty() {
4994 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4995 (value_type.without_modifiers(), vec![])
4996 } else {
4997 sql_bail!("cannot determine type of empty map");
4998 }
4999 } else {
5000 let type_hint = match type_hint {
5001 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5002 _ => None,
5003 };
5004
5005 let mut keys = vec![];
5006 let mut values = vec![];
5007 for MapEntry { key, value } in entries {
5008 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5009 let value = match value {
5010 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5013 _ => plan_expr(ecx, value)?,
5014 };
5015 keys.push(key);
5016 values.push(value);
5017 }
5018 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5019 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5020 let out = itertools::interleave(keys, values).collect();
5021 (value_type, out)
5022 };
5023
5024 if matches!(value_type, SqlScalarType::Char { .. }) {
5025 bail_unsupported!("char map");
5026 }
5027
5028 let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5029 Ok(expr.into())
5030}
5031
5032pub fn coerce_homogeneous_exprs(
5049 ecx: &ExprContext,
5050 exprs: Vec<CoercibleScalarExpr>,
5051 force_type: Option<&SqlScalarType>,
5052) -> Result<Vec<HirScalarExpr>, PlanError> {
5053 assert!(!exprs.is_empty());
5054
5055 let target_holder;
5056 let target = match force_type {
5057 Some(t) => t,
5058 None => {
5059 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5060 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5061 &target_holder
5062 }
5063 };
5064
5065 let mut out = Vec::new();
5067 for expr in exprs {
5068 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5069 let ccx = match force_type {
5070 None => CastContext::Implicit,
5071 Some(_) => CastContext::Explicit,
5072 };
5073 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5074 Ok(expr) => out.push(expr),
5075 Err(_) => sql_bail!(
5076 "{} could not convert type {} to {}",
5077 ecx.name,
5078 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5079 ecx.humanize_scalar_type(target, false),
5080 ),
5081 }
5082 }
5083 Ok(out)
5084}
5085
5086pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5089 obe: &OrderByExpr<T>,
5090 column: usize,
5091) -> ColumnOrder {
5092 let desc = !obe.asc.unwrap_or(true);
5093 ColumnOrder {
5094 column,
5095 desc,
5096 nulls_last: obe.nulls_last.unwrap_or(!desc),
5099 }
5100}
5101
5102fn plan_function_order_by(
5110 ecx: &ExprContext,
5111 order_by: &[OrderByExpr<Aug>],
5112) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5113 let mut order_by_exprs = vec![];
5114 let mut col_orders = vec![];
5115 {
5116 for (i, obe) in order_by.iter().enumerate() {
5117 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5121 order_by_exprs.push(expr);
5122 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5123 }
5124 }
5125 Ok((order_by_exprs, col_orders))
5126}
5127
5128fn plan_aggregate_common(
5130 ecx: &ExprContext,
5131 Function::<Aug> {
5132 name,
5133 args,
5134 filter,
5135 over: _,
5136 distinct,
5137 }: &Function<Aug>,
5138) -> Result<AggregateExpr, PlanError> {
5139 let impls = match resolve_func(ecx, name, args)? {
5154 Func::Aggregate(impls) => impls,
5155 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5156 };
5157
5158 let (args, order_by) = match &args {
5167 FunctionArgs::Star => (vec![], vec![]),
5168 FunctionArgs::Args { args, order_by } => {
5169 if args.is_empty() {
5170 sql_bail!(
5171 "{}(*) must be used to call a parameterless aggregate function",
5172 ecx.qcx
5173 .scx
5174 .humanize_resolved_name(name)
5175 .expect("name actually resolved")
5176 );
5177 }
5178 let args = plan_exprs(ecx, args)?;
5179 (args, order_by.clone())
5180 }
5181 };
5182
5183 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5184
5185 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5186 if let Some(filter) = &filter {
5187 let cond =
5197 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5198 let expr_typ = ecx.scalar_type(&expr);
5199 expr = HirScalarExpr::if_then_else(
5200 cond,
5201 expr,
5202 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5203 );
5204 }
5205
5206 let mut seen_outer = false;
5207 let mut seen_inner = false;
5208 #[allow(deprecated)]
5209 expr.visit_columns(0, &mut |depth, col| {
5210 if depth == 0 && col.level == 0 {
5211 seen_inner = true;
5212 } else if col.level > depth {
5213 seen_outer = true;
5214 }
5215 });
5216 if seen_outer && !seen_inner {
5217 bail_unsupported!(
5218 3720,
5219 "aggregate functions that refer exclusively to outer columns"
5220 );
5221 }
5222
5223 if func.is_order_sensitive() {
5226 let field_names = iter::repeat(ColumnName::from(""))
5227 .take(1 + order_by_exprs.len())
5228 .collect();
5229 let mut exprs = vec![expr];
5230 exprs.extend(order_by_exprs);
5231 expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5232 }
5233
5234 Ok(AggregateExpr {
5235 func,
5236 expr: Box::new(expr),
5237 distinct: *distinct,
5238 })
5239}
5240
5241fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5242 let mut names = names.to_vec();
5243 let col_name = normalize::column_name(names.pop().unwrap());
5244
5245 if !names.is_empty() {
5247 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5248 let (i, i_name) = ecx.scope.resolve_table_column(
5249 &ecx.qcx.outer_scopes,
5250 &table_name,
5251 &col_name,
5252 &mut ecx.qcx.name_manager.borrow_mut(),
5253 )?;
5254 return Ok(HirScalarExpr::named_column(i, i_name));
5255 }
5256
5257 let similar_names = match ecx.scope.resolve_column(
5260 &ecx.qcx.outer_scopes,
5261 &col_name,
5262 &mut ecx.qcx.name_manager.borrow_mut(),
5263 ) {
5264 Ok((i, i_name)) => {
5265 return Ok(HirScalarExpr::named_column(i, i_name));
5266 }
5267 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5268 Err(e) => return Err(e),
5269 };
5270
5271 let items = ecx.scope.items_from_table(
5274 &ecx.qcx.outer_scopes,
5275 &PartialItemName {
5276 database: None,
5277 schema: None,
5278 item: col_name.as_str().to_owned(),
5279 },
5280 )?;
5281 match items.as_slice() {
5282 [] => Err(PlanError::UnknownColumn {
5284 table: None,
5285 column: col_name,
5286 similar: similar_names,
5287 }),
5288 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5293 *column,
5294 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5295 )),
5296 _ => {
5299 let mut has_exists_column = None;
5300 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5301 .into_iter()
5302 .filter_map(|(column, item)| {
5303 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5304 has_exists_column = Some(column);
5305 None
5306 } else {
5307 let expr = HirScalarExpr::named_column(
5308 column,
5309 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5310 );
5311 let name = item.column_name.clone();
5312 Some((expr, name))
5313 }
5314 })
5315 .unzip();
5316 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5318 exprs.into_element()
5319 } else {
5320 HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5321 };
5322 if let Some(has_exists_column) = has_exists_column {
5323 Ok(HirScalarExpr::if_then_else(
5324 HirScalarExpr::unnamed_column(has_exists_column)
5325 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5326 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5327 expr,
5328 ))
5329 } else {
5330 Ok(expr)
5331 }
5332 }
5333 }
5334}
5335
5336fn plan_op(
5337 ecx: &ExprContext,
5338 op: &str,
5339 expr1: &Expr<Aug>,
5340 expr2: Option<&Expr<Aug>>,
5341) -> Result<HirScalarExpr, PlanError> {
5342 let impls = func::resolve_op(op)?;
5343 let args = match expr2 {
5344 None => plan_exprs(ecx, &[expr1])?,
5345 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5346 };
5347 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5348}
5349
5350fn plan_function<'a>(
5351 ecx: &ExprContext,
5352 f @ Function {
5353 name,
5354 args,
5355 filter,
5356 over,
5357 distinct,
5358 }: &'a Function<Aug>,
5359) -> Result<HirScalarExpr, PlanError> {
5360 let impls = match resolve_func(ecx, name, args)? {
5361 Func::Table(_) => {
5362 sql_bail!(
5363 "table functions are not allowed in {} (function {})",
5364 ecx.name,
5365 name
5366 );
5367 }
5368 Func::Scalar(impls) => {
5369 if over.is_some() {
5370 sql_bail!(
5371 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5372 );
5373 }
5374 impls
5375 }
5376 Func::ScalarWindow(impls) => {
5377 let (
5378 ignore_nulls,
5379 order_by_exprs,
5380 col_orders,
5381 _window_frame,
5382 partition_by,
5383 scalar_args,
5384 ) = plan_window_function_non_aggr(ecx, f)?;
5385
5386 if !scalar_args.is_empty() {
5390 if let ResolvedItemName::Item {
5391 full_name: FullItemName { item, .. },
5392 ..
5393 } = name
5394 {
5395 sql_bail!(
5396 "function {} has 0 parameters, but was called with {}",
5397 item,
5398 scalar_args.len()
5399 );
5400 }
5401 }
5402
5403 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5408
5409 if ignore_nulls {
5410 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5413 }
5414
5415 return Ok(HirScalarExpr::windowing(WindowExpr {
5416 func: WindowExprType::Scalar(ScalarWindowExpr {
5417 func,
5418 order_by: col_orders,
5419 }),
5420 partition_by,
5421 order_by: order_by_exprs,
5422 }));
5423 }
5424 Func::ValueWindow(impls) => {
5425 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5426 plan_window_function_non_aggr(ecx, f)?;
5427
5428 let (args_encoded, func) =
5429 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5430
5431 if ignore_nulls {
5432 match func {
5433 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5434 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5435 }
5436 }
5437
5438 return Ok(HirScalarExpr::windowing(WindowExpr {
5439 func: WindowExprType::Value(ValueWindowExpr {
5440 func,
5441 args: Box::new(args_encoded),
5442 order_by: col_orders,
5443 window_frame,
5444 ignore_nulls, }),
5446 partition_by,
5447 order_by: order_by_exprs,
5448 }));
5449 }
5450 Func::Aggregate(_) => {
5451 if f.over.is_none() {
5452 if ecx.allow_aggregates {
5454 sql_bail!(
5457 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5458 name,
5459 );
5460 } else {
5461 sql_bail!(
5464 "aggregate functions are not allowed in {} (function {})",
5465 ecx.name,
5466 name
5467 );
5468 }
5469 } else {
5470 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5471 plan_window_function_common(ecx, &f.name, &f.over)?;
5472
5473 match (&window_frame.start_bound, &window_frame.end_bound) {
5475 (
5476 mz_expr::WindowFrameBound::UnboundedPreceding,
5477 mz_expr::WindowFrameBound::OffsetPreceding(..),
5478 )
5479 | (
5480 mz_expr::WindowFrameBound::UnboundedPreceding,
5481 mz_expr::WindowFrameBound::OffsetFollowing(..),
5482 )
5483 | (
5484 mz_expr::WindowFrameBound::OffsetPreceding(..),
5485 mz_expr::WindowFrameBound::UnboundedFollowing,
5486 )
5487 | (
5488 mz_expr::WindowFrameBound::OffsetFollowing(..),
5489 mz_expr::WindowFrameBound::UnboundedFollowing,
5490 ) => bail_unsupported!("mixed unbounded - offset frames"),
5491 (_, _) => {} }
5493
5494 if ignore_nulls {
5495 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5499 }
5500
5501 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5502
5503 if aggregate_expr.distinct {
5504 bail_unsupported!("DISTINCT in window aggregates");
5506 }
5507
5508 return Ok(HirScalarExpr::windowing(WindowExpr {
5509 func: WindowExprType::Aggregate(AggregateWindowExpr {
5510 aggregate_expr,
5511 order_by: col_orders,
5512 window_frame,
5513 }),
5514 partition_by,
5515 order_by: order_by_exprs,
5516 }));
5517 }
5518 }
5519 };
5520
5521 if over.is_some() {
5522 unreachable!("If there is an OVER clause, we should have returned already above.");
5523 }
5524
5525 if *distinct {
5526 sql_bail!(
5527 "DISTINCT specified, but {} is not an aggregate function",
5528 ecx.qcx
5529 .scx
5530 .humanize_resolved_name(name)
5531 .expect("already resolved")
5532 );
5533 }
5534 if filter.is_some() {
5535 sql_bail!(
5536 "FILTER specified, but {} is not an aggregate function",
5537 ecx.qcx
5538 .scx
5539 .humanize_resolved_name(name)
5540 .expect("already resolved")
5541 );
5542 }
5543
5544 let scalar_args = match &args {
5545 FunctionArgs::Star => {
5546 sql_bail!(
5547 "* argument is invalid with non-aggregate function {}",
5548 ecx.qcx
5549 .scx
5550 .humanize_resolved_name(name)
5551 .expect("already resolved")
5552 )
5553 }
5554 FunctionArgs::Args { args, order_by } => {
5555 if !order_by.is_empty() {
5556 sql_bail!(
5557 "ORDER BY specified, but {} is not an aggregate function",
5558 ecx.qcx
5559 .scx
5560 .humanize_resolved_name(name)
5561 .expect("already resolved")
5562 );
5563 }
5564 plan_exprs(ecx, args)?
5565 }
5566 };
5567
5568 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5569}
5570
5571pub const IGNORE_NULLS_ERROR_MSG: &str =
5572 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5573
5574pub fn resolve_func(
5578 ecx: &ExprContext,
5579 name: &ResolvedItemName,
5580 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5581) -> Result<&'static Func, PlanError> {
5582 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5583 if let Ok(f) = i.func() {
5584 return Ok(f);
5585 }
5586 }
5587
5588 let cexprs = match args {
5591 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5592 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5593 if !order_by.is_empty() {
5594 sql_bail!(
5595 "ORDER BY specified, but {} is not an aggregate function",
5596 name
5597 );
5598 }
5599 plan_exprs(ecx, args)?
5600 }
5601 };
5602
5603 let arg_types: Vec<_> = cexprs
5604 .into_iter()
5605 .map(|ty| match ecx.scalar_type(&ty) {
5606 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5607 CoercibleScalarType::Record(_) => "record".to_string(),
5608 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5609 })
5610 .collect();
5611
5612 Err(PlanError::UnknownFunction {
5613 name: name.to_string(),
5614 arg_types,
5615 })
5616}
5617
5618fn plan_is_expr<'a>(
5619 ecx: &ExprContext,
5620 expr: &'a Expr<Aug>,
5621 construct: &IsExprConstruct<Aug>,
5622 not: bool,
5623) -> Result<HirScalarExpr, PlanError> {
5624 let expr = plan_expr(ecx, expr)?;
5625 let mut expr = match construct {
5626 IsExprConstruct::Null => {
5627 let expr = expr.type_as_any(ecx)?;
5632 expr.call_is_null()
5633 }
5634 IsExprConstruct::Unknown => {
5635 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5636 expr.call_is_null()
5637 }
5638 IsExprConstruct::True => {
5639 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5640 expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5641 }
5642 IsExprConstruct::False => {
5643 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5644 expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5645 }
5646 IsExprConstruct::DistinctFrom(expr2) => {
5647 let expr1 = expr.type_as_any(ecx)?;
5648 let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5649 let term1 = HirScalarExpr::variadic_or(vec![
5656 expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5657 expr1.clone().call_is_null(),
5658 expr2.clone().call_is_null(),
5659 ]);
5660 let term2 = HirScalarExpr::variadic_or(vec![
5661 expr1.call_is_null().not(),
5662 expr2.call_is_null().not(),
5663 ]);
5664 term1.and(term2)
5665 }
5666 };
5667 if not {
5668 expr = expr.not();
5669 }
5670 Ok(expr)
5671}
5672
5673fn plan_case<'a>(
5674 ecx: &ExprContext,
5675 operand: &'a Option<Box<Expr<Aug>>>,
5676 conditions: &'a [Expr<Aug>],
5677 results: &'a [Expr<Aug>],
5678 else_result: &'a Option<Box<Expr<Aug>>>,
5679) -> Result<HirScalarExpr, PlanError> {
5680 let mut cond_exprs = Vec::new();
5681 let mut result_exprs = Vec::new();
5682 for (c, r) in conditions.iter().zip_eq(results) {
5683 let c = match operand {
5684 Some(operand) => operand.clone().equals(c.clone()),
5685 None => c.clone(),
5686 };
5687 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5688 cond_exprs.push(cexpr);
5689 result_exprs.push(r);
5690 }
5691 result_exprs.push(match else_result {
5692 Some(else_result) => else_result,
5693 None => &Expr::Value(Value::Null),
5694 });
5695 let mut result_exprs = coerce_homogeneous_exprs(
5696 &ecx.with_name("CASE"),
5697 plan_exprs(ecx, &result_exprs)?,
5698 None,
5699 )?;
5700 let mut expr = result_exprs.pop().unwrap();
5701 assert_eq!(cond_exprs.len(), result_exprs.len());
5702 for (cexpr, rexpr) in cond_exprs
5703 .into_iter()
5704 .rev()
5705 .zip_eq(result_exprs.into_iter().rev())
5706 {
5707 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5708 }
5709 Ok(expr)
5710}
5711
5712fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5713 let (datum, scalar_type) = match l {
5714 Value::Number(s) => {
5715 let d = strconv::parse_numeric(s.as_str())?;
5716 if !s.contains(&['E', '.'][..]) {
5717 if let Ok(n) = d.0.try_into() {
5719 (Datum::Int32(n), SqlScalarType::Int32)
5720 } else if let Ok(n) = d.0.try_into() {
5721 (Datum::Int64(n), SqlScalarType::Int64)
5722 } else {
5723 (
5724 Datum::Numeric(d),
5725 SqlScalarType::Numeric { max_scale: None },
5726 )
5727 }
5728 } else {
5729 (
5730 Datum::Numeric(d),
5731 SqlScalarType::Numeric { max_scale: None },
5732 )
5733 }
5734 }
5735 Value::HexString(_) => bail_unsupported!("hex string literals"),
5736 Value::Boolean(b) => match b {
5737 false => (Datum::False, SqlScalarType::Bool),
5738 true => (Datum::True, SqlScalarType::Bool),
5739 },
5740 Value::Interval(i) => {
5741 let i = literal::plan_interval(i)?;
5742 (Datum::Interval(i), SqlScalarType::Interval)
5743 }
5744 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5745 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5746 };
5747 let expr = HirScalarExpr::literal(datum, scalar_type);
5748 Ok(expr.into())
5749}
5750
5751fn plan_window_function_non_aggr<'a>(
5754 ecx: &ExprContext,
5755 Function {
5756 name,
5757 args,
5758 filter,
5759 over,
5760 distinct,
5761 }: &'a Function<Aug>,
5762) -> Result<
5763 (
5764 bool,
5765 Vec<HirScalarExpr>,
5766 Vec<ColumnOrder>,
5767 mz_expr::WindowFrame,
5768 Vec<HirScalarExpr>,
5769 Vec<CoercibleScalarExpr>,
5770 ),
5771 PlanError,
5772> {
5773 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5774 plan_window_function_common(ecx, name, over)?;
5775
5776 if *distinct {
5777 sql_bail!(
5778 "DISTINCT specified, but {} is not an aggregate function",
5779 name
5780 );
5781 }
5782
5783 if filter.is_some() {
5784 bail_unsupported!("FILTER in non-aggregate window functions");
5785 }
5786
5787 let scalar_args = match &args {
5788 FunctionArgs::Star => {
5789 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5790 }
5791 FunctionArgs::Args { args, order_by } => {
5792 if !order_by.is_empty() {
5793 sql_bail!(
5794 "ORDER BY specified, but {} is not an aggregate function",
5795 name
5796 );
5797 }
5798 plan_exprs(ecx, args)?
5799 }
5800 };
5801
5802 Ok((
5803 ignore_nulls,
5804 order_by_exprs,
5805 col_orders,
5806 window_frame,
5807 partition,
5808 scalar_args,
5809 ))
5810}
5811
5812fn plan_window_function_common(
5814 ecx: &ExprContext,
5815 name: &<Aug as AstInfo>::ItemName,
5816 over: &Option<WindowSpec<Aug>>,
5817) -> Result<
5818 (
5819 bool,
5820 Vec<HirScalarExpr>,
5821 Vec<ColumnOrder>,
5822 mz_expr::WindowFrame,
5823 Vec<HirScalarExpr>,
5824 ),
5825 PlanError,
5826> {
5827 if !ecx.allow_windows {
5828 sql_bail!(
5829 "window functions are not allowed in {} (function {})",
5830 ecx.name,
5831 name
5832 );
5833 }
5834
5835 let window_spec = match over.as_ref() {
5836 Some(over) => over,
5837 None => sql_bail!("window function {} requires an OVER clause", name),
5838 };
5839 if window_spec.ignore_nulls && window_spec.respect_nulls {
5840 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5841 }
5842 let window_frame = match window_spec.window_frame.as_ref() {
5843 Some(frame) => plan_window_frame(frame)?,
5844 None => mz_expr::WindowFrame::default(),
5845 };
5846 let mut partition = Vec::new();
5847 for expr in &window_spec.partition_by {
5848 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5849 }
5850
5851 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5852
5853 Ok((
5854 window_spec.ignore_nulls,
5855 order_by_exprs,
5856 col_orders,
5857 window_frame,
5858 partition,
5859 ))
5860}
5861
5862fn plan_window_frame(
5863 WindowFrame {
5864 units,
5865 start_bound,
5866 end_bound,
5867 }: &WindowFrame,
5868) -> Result<mz_expr::WindowFrame, PlanError> {
5869 use mz_expr::WindowFrameBound::*;
5870 let units = window_frame_unit_ast_to_expr(units)?;
5871 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5872 let end_bound = end_bound
5873 .as_ref()
5874 .map(window_frame_bound_ast_to_expr)
5875 .unwrap_or(CurrentRow);
5876
5877 match (&start_bound, &end_bound) {
5879 (UnboundedFollowing, _) => {
5881 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5882 }
5883 (_, UnboundedPreceding) => {
5885 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5886 }
5887 (CurrentRow, OffsetPreceding(_)) => {
5889 sql_bail!("frame starting from current row cannot have preceding rows")
5890 }
5891 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5892 sql_bail!("frame starting from following row cannot have preceding rows")
5893 }
5894 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5897 if *o1 > 1000000 || *o2 > 1000000 {
5901 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5902 }
5903 }
5904 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5905 if *o1 > 1000000 || *o2 > 1000000 {
5906 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5907 }
5908 }
5909 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5910 if *o1 > 1000000 || *o2 > 1000000 {
5911 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5912 }
5913 }
5914 (OffsetPreceding(o), CurrentRow) => {
5915 if *o > 1000000 {
5916 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5917 }
5918 }
5919 (CurrentRow, OffsetFollowing(o)) => {
5920 if *o > 1000000 {
5921 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5922 }
5923 }
5924 (_, _) => (),
5926 }
5927
5928 if units == mz_expr::WindowFrameUnits::Range
5931 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5932 {
5933 bail_unsupported!("RANGE in non-default window frames")
5934 }
5935
5936 let frame = mz_expr::WindowFrame {
5937 units,
5938 start_bound,
5939 end_bound,
5940 };
5941 Ok(frame)
5942}
5943
5944fn window_frame_unit_ast_to_expr(
5945 unit: &WindowFrameUnits,
5946) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5947 match unit {
5948 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5949 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5950 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5951 }
5952}
5953
5954fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5955 match bound {
5956 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5957 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5958 WindowFrameBound::Preceding(Some(offset)) => {
5959 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5960 }
5961 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5962 WindowFrameBound::Following(Some(offset)) => {
5963 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5964 }
5965 }
5966}
5967
5968pub fn scalar_type_from_sql(
5969 scx: &StatementContext,
5970 data_type: &ResolvedDataType,
5971) -> Result<SqlScalarType, PlanError> {
5972 match data_type {
5973 ResolvedDataType::AnonymousList(elem_type) => {
5974 let elem_type = scalar_type_from_sql(scx, elem_type)?;
5975 if matches!(elem_type, SqlScalarType::Char { .. }) {
5976 bail_unsupported!("char list");
5977 }
5978 Ok(SqlScalarType::List {
5979 element_type: Box::new(elem_type),
5980 custom_id: None,
5981 })
5982 }
5983 ResolvedDataType::AnonymousMap {
5984 key_type,
5985 value_type,
5986 } => {
5987 match scalar_type_from_sql(scx, key_type)? {
5988 SqlScalarType::String => {}
5989 other => sql_bail!(
5990 "map key type must be {}, got {}",
5991 scx.humanize_scalar_type(&SqlScalarType::String, false),
5992 scx.humanize_scalar_type(&other, false)
5993 ),
5994 }
5995 Ok(SqlScalarType::Map {
5996 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5997 custom_id: None,
5998 })
5999 }
6000 ResolvedDataType::Named { id, modifiers, .. } => {
6001 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6002 }
6003 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6004 }
6005}
6006
6007pub fn scalar_type_from_catalog(
6008 catalog: &dyn SessionCatalog,
6009 id: CatalogItemId,
6010 modifiers: &[i64],
6011) -> Result<SqlScalarType, PlanError> {
6012 let entry = catalog.get_item(&id);
6013 let type_details = match entry.type_details() {
6014 Some(type_details) => type_details,
6015 None => {
6016 sql_bail!(
6019 "internal error: {} does not refer to a type",
6020 catalog.resolve_full_name(entry.name()).to_string().quoted()
6021 );
6022 }
6023 };
6024 match &type_details.typ {
6025 CatalogType::Numeric => {
6026 let mut modifiers = modifiers.iter().fuse();
6027 let precision = match modifiers.next() {
6028 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6029 sql_bail!(
6030 "precision for type numeric must be between 1 and {}",
6031 NUMERIC_DATUM_MAX_PRECISION,
6032 );
6033 }
6034 Some(p) => Some(*p),
6035 None => None,
6036 };
6037 let scale = match modifiers.next() {
6038 Some(scale) => {
6039 if let Some(precision) = precision {
6040 if *scale > precision {
6041 sql_bail!(
6042 "scale for type numeric must be between 0 and precision {}",
6043 precision
6044 );
6045 }
6046 }
6047 Some(NumericMaxScale::try_from(*scale)?)
6048 }
6049 None => None,
6050 };
6051 if modifiers.next().is_some() {
6052 sql_bail!("type numeric supports at most two type modifiers");
6053 }
6054 Ok(SqlScalarType::Numeric { max_scale: scale })
6055 }
6056 CatalogType::Char => {
6057 let mut modifiers = modifiers.iter().fuse();
6058 let length = match modifiers.next() {
6059 Some(l) => Some(CharLength::try_from(*l)?),
6060 None => Some(CharLength::ONE),
6061 };
6062 if modifiers.next().is_some() {
6063 sql_bail!("type character supports at most one type modifier");
6064 }
6065 Ok(SqlScalarType::Char { length })
6066 }
6067 CatalogType::VarChar => {
6068 let mut modifiers = modifiers.iter().fuse();
6069 let length = match modifiers.next() {
6070 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6071 None => None,
6072 };
6073 if modifiers.next().is_some() {
6074 sql_bail!("type character varying supports at most one type modifier");
6075 }
6076 Ok(SqlScalarType::VarChar { max_length: length })
6077 }
6078 CatalogType::Timestamp => {
6079 let mut modifiers = modifiers.iter().fuse();
6080 let precision = match modifiers.next() {
6081 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6082 None => None,
6083 };
6084 if modifiers.next().is_some() {
6085 sql_bail!("type timestamp supports at most one type modifier");
6086 }
6087 Ok(SqlScalarType::Timestamp { precision })
6088 }
6089 CatalogType::TimestampTz => {
6090 let mut modifiers = modifiers.iter().fuse();
6091 let precision = match modifiers.next() {
6092 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6093 None => None,
6094 };
6095 if modifiers.next().is_some() {
6096 sql_bail!("type timestamp with time zone supports at most one type modifier");
6097 }
6098 Ok(SqlScalarType::TimestampTz { precision })
6099 }
6100 t => {
6101 if !modifiers.is_empty() {
6102 sql_bail!(
6103 "{} does not support type modifiers",
6104 catalog.resolve_full_name(entry.name()).to_string()
6105 );
6106 }
6107 match t {
6108 CatalogType::Array {
6109 element_reference: element_id,
6110 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6111 catalog,
6112 *element_id,
6113 modifiers,
6114 )?))),
6115 CatalogType::List {
6116 element_reference: element_id,
6117 element_modifiers,
6118 } => Ok(SqlScalarType::List {
6119 element_type: Box::new(scalar_type_from_catalog(
6120 catalog,
6121 *element_id,
6122 element_modifiers,
6123 )?),
6124 custom_id: Some(id),
6125 }),
6126 CatalogType::Map {
6127 key_reference: _,
6128 key_modifiers: _,
6129 value_reference: value_id,
6130 value_modifiers,
6131 } => Ok(SqlScalarType::Map {
6132 value_type: Box::new(scalar_type_from_catalog(
6133 catalog,
6134 *value_id,
6135 value_modifiers,
6136 )?),
6137 custom_id: Some(id),
6138 }),
6139 CatalogType::Range {
6140 element_reference: element_id,
6141 } => Ok(SqlScalarType::Range {
6142 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6143 }),
6144 CatalogType::Record { fields } => {
6145 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6146 .iter()
6147 .map(|f| {
6148 let scalar_type = scalar_type_from_catalog(
6149 catalog,
6150 f.type_reference,
6151 &f.type_modifiers,
6152 )?;
6153 Ok((
6154 f.name.clone(),
6155 SqlColumnType {
6156 scalar_type,
6157 nullable: true,
6158 },
6159 ))
6160 })
6161 .collect::<Result<Box<_>, PlanError>>()?;
6162 Ok(SqlScalarType::Record {
6163 fields: scalars,
6164 custom_id: Some(id),
6165 })
6166 }
6167 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6168 CatalogType::Bool => Ok(SqlScalarType::Bool),
6169 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6170 CatalogType::Date => Ok(SqlScalarType::Date),
6171 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6172 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6173 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6174 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6175 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6176 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6177 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6178 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6179 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6180 CatalogType::Interval => Ok(SqlScalarType::Interval),
6181 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6182 CatalogType::Oid => Ok(SqlScalarType::Oid),
6183 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6184 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6185 CatalogType::Pseudo => {
6186 sql_bail!(
6187 "cannot reference pseudo type {}",
6188 catalog.resolve_full_name(entry.name()).to_string()
6189 )
6190 }
6191 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6192 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6193 CatalogType::RegType => Ok(SqlScalarType::RegType),
6194 CatalogType::String => Ok(SqlScalarType::String),
6195 CatalogType::Time => Ok(SqlScalarType::Time),
6196 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6197 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6198 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6199 CatalogType::Numeric => unreachable!("handled above"),
6200 CatalogType::Char => unreachable!("handled above"),
6201 CatalogType::VarChar => unreachable!("handled above"),
6202 CatalogType::Timestamp => unreachable!("handled above"),
6203 CatalogType::TimestampTz => unreachable!("handled above"),
6204 }
6205 }
6206 }
6207}
6208
6209struct AggregateTableFuncVisitor<'a> {
6212 scx: &'a StatementContext<'a>,
6213 aggs: Vec<Function<Aug>>,
6214 within_aggregate: bool,
6215 tables: BTreeMap<Function<Aug>, String>,
6216 table_disallowed_context: Vec<&'static str>,
6217 in_select_item: bool,
6218 id_gen: IdGen,
6219 err: Option<PlanError>,
6220}
6221
6222impl<'a> AggregateTableFuncVisitor<'a> {
6223 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6224 AggregateTableFuncVisitor {
6225 scx,
6226 aggs: Vec::new(),
6227 within_aggregate: false,
6228 tables: BTreeMap::new(),
6229 table_disallowed_context: Vec::new(),
6230 in_select_item: false,
6231 id_gen: Default::default(),
6232 err: None,
6233 }
6234 }
6235
6236 fn into_result(
6237 self,
6238 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6239 match self.err {
6240 Some(err) => Err(err),
6241 None => {
6242 let mut seen = BTreeSet::new();
6245 let aggs = self
6246 .aggs
6247 .into_iter()
6248 .filter(move |agg| seen.insert(agg.clone()))
6249 .collect();
6250 Ok((aggs, self.tables))
6251 }
6252 }
6253 }
6254}
6255
6256impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6257 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6258 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6259 Ok(i) => i,
6260 Err(_) => return,
6262 };
6263
6264 match item.func() {
6265 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6268 if self.within_aggregate {
6269 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6270 return;
6271 }
6272 self.aggs.push(func.clone());
6273 let Function {
6274 name: _,
6275 args,
6276 filter,
6277 over: _,
6278 distinct: _,
6279 } = func;
6280 if let Some(filter) = filter {
6281 self.visit_expr_mut(filter);
6282 }
6283 let old_within_aggregate = self.within_aggregate;
6284 self.within_aggregate = true;
6285 self.table_disallowed_context
6286 .push("aggregate function calls");
6287
6288 self.visit_function_args_mut(args);
6289
6290 self.within_aggregate = old_within_aggregate;
6291 self.table_disallowed_context.pop();
6292 }
6293 Ok(Func::Table { .. }) => {
6294 self.table_disallowed_context.push("other table functions");
6295 visit_mut::visit_function_mut(self, func);
6296 self.table_disallowed_context.pop();
6297 }
6298 _ => visit_mut::visit_function_mut(self, func),
6299 }
6300 }
6301
6302 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6303 }
6305
6306 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6307 let (disallowed_context, func) = match expr {
6308 Expr::Case { .. } => (Some("CASE"), None),
6309 Expr::HomogenizingFunction {
6310 function: HomogenizingFunction::Coalesce,
6311 ..
6312 } => (Some("COALESCE"), None),
6313 Expr::Function(func) if self.in_select_item => {
6314 let mut table_func = None;
6317 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6318 if let Ok(Func::Table { .. }) = item.func() {
6319 if let Some(context) = self.table_disallowed_context.last() {
6320 self.err = Some(sql_err!(
6321 "table functions are not allowed in {} (function {})",
6322 context,
6323 func.name
6324 ));
6325 return;
6326 }
6327 table_func = Some(func.clone());
6328 }
6329 }
6330 (None, table_func)
6333 }
6334 _ => (None, None),
6335 };
6336 if let Some(func) = func {
6337 visit_mut::visit_expr_mut(self, expr);
6339 if let Function {
6341 name: _,
6342 args: _,
6343 filter: None,
6344 over: None,
6345 distinct: false,
6346 } = &func
6347 {
6348 let unique_id = self.id_gen.allocate_id();
6350 let id = self
6351 .tables
6352 .entry(func)
6353 .or_insert_with(|| format!("table_func_{unique_id}"));
6354 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6357 }
6358 }
6359 if let Some(context) = disallowed_context {
6360 self.table_disallowed_context.push(context);
6361 }
6362
6363 visit_mut::visit_expr_mut(self, expr);
6364
6365 if disallowed_context.is_some() {
6366 self.table_disallowed_context.pop();
6367 }
6368 }
6369
6370 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6371 let old = self.in_select_item;
6372 self.in_select_item = true;
6373 visit_mut::visit_select_item_mut(self, si);
6374 self.in_select_item = old;
6375 }
6376}
6377
6378#[derive(Default)]
6379struct WindowFuncCollector {
6380 window_funcs: Vec<Expr<Aug>>,
6381}
6382
6383impl WindowFuncCollector {
6384 fn into_result(self) -> Vec<Expr<Aug>> {
6385 let mut seen = BTreeSet::new();
6387 let window_funcs_dedupped = self
6388 .window_funcs
6389 .into_iter()
6390 .filter(move |expr| seen.insert(expr.clone()))
6391 .rev()
6394 .collect();
6395 window_funcs_dedupped
6396 }
6397}
6398
6399impl Visit<'_, Aug> for WindowFuncCollector {
6400 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6401 match expr {
6402 Expr::Function(func) => {
6403 if func.over.is_some() {
6404 self.window_funcs.push(expr.clone());
6405 }
6406 }
6407 _ => (),
6408 }
6409 visit::visit_expr(self, expr);
6410 }
6411
6412 fn visit_query(&mut self, _query: &Query<Aug>) {
6413 }
6415}
6416
6417#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6419pub enum QueryLifetime {
6420 OneShot,
6422 Index,
6424 MaterializedView,
6426 Subscribe,
6428 View,
6430 Source,
6432}
6433
6434impl QueryLifetime {
6435 pub fn is_one_shot(&self) -> bool {
6439 let result = match self {
6440 QueryLifetime::OneShot => true,
6441 QueryLifetime::Index => false,
6442 QueryLifetime::MaterializedView => false,
6443 QueryLifetime::Subscribe => false,
6444 QueryLifetime::View => false,
6445 QueryLifetime::Source => false,
6446 };
6447 assert_eq!(!result, self.is_maintained());
6448 result
6449 }
6450
6451 pub fn is_maintained(&self) -> bool {
6454 match self {
6455 QueryLifetime::OneShot => false,
6456 QueryLifetime::Index => true,
6457 QueryLifetime::MaterializedView => true,
6458 QueryLifetime::Subscribe => true,
6459 QueryLifetime::View => true,
6460 QueryLifetime::Source => true,
6461 }
6462 }
6463
6464 pub fn allow_show(&self) -> bool {
6466 match self {
6467 QueryLifetime::OneShot => true,
6468 QueryLifetime::Index => false,
6469 QueryLifetime::MaterializedView => false,
6470 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6472 QueryLifetime::Source => false,
6473 }
6474 }
6475}
6476
6477#[derive(Debug, Clone)]
6479pub struct CteDesc {
6480 pub name: String,
6481 pub desc: RelationDesc,
6482}
6483
6484#[derive(Debug, Clone)]
6486pub struct QueryContext<'a> {
6487 pub scx: &'a StatementContext<'a>,
6489 pub lifetime: QueryLifetime,
6491 pub outer_scopes: Vec<Scope>,
6493 pub outer_relation_types: Vec<SqlRelationType>,
6495 pub ctes: BTreeMap<LocalId, CteDesc>,
6497 pub name_manager: Rc<RefCell<NameManager>>,
6499 pub recursion_guard: RecursionGuard,
6500}
6501
6502impl CheckedRecursion for QueryContext<'_> {
6503 fn recursion_guard(&self) -> &RecursionGuard {
6504 &self.recursion_guard
6505 }
6506}
6507
6508impl<'a> QueryContext<'a> {
6509 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6510 QueryContext {
6511 scx,
6512 lifetime,
6513 outer_scopes: vec![],
6514 outer_relation_types: vec![],
6515 ctes: BTreeMap::new(),
6516 name_manager: Rc::new(RefCell::new(NameManager::new())),
6517 recursion_guard: RecursionGuard::with_limit(1024), }
6519 }
6520
6521 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6522 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6523 }
6524
6525 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6528 let ctes = self.ctes.clone();
6529 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6530 let outer_relation_types = iter::once(relation_type)
6531 .chain(self.outer_relation_types.clone())
6532 .collect();
6533 let name_manager = Rc::clone(&self.name_manager);
6535
6536 QueryContext {
6537 scx: self.scx,
6538 lifetime: self.lifetime,
6539 outer_scopes,
6540 outer_relation_types,
6541 ctes,
6542 name_manager,
6543 recursion_guard: self.recursion_guard.clone(),
6544 }
6545 }
6546
6547 fn empty_derived_context(&self) -> QueryContext<'a> {
6549 let scope = Scope::empty();
6550 let ty = SqlRelationType::empty();
6551 self.derived_context(scope, ty)
6552 }
6553
6554 pub fn resolve_table_name(
6557 &self,
6558 object: ResolvedItemName,
6559 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6560 match object {
6561 ResolvedItemName::Item {
6562 id,
6563 full_name,
6564 version,
6565 ..
6566 } => {
6567 let name = full_name.into();
6568 let item = self.scx.get_item(&id).at_version(version);
6569 let desc = item
6570 .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6571 .clone();
6572 let expr = HirRelationExpr::Get {
6573 id: Id::Global(item.global_id()),
6574 typ: desc.typ().clone(),
6575 };
6576
6577 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6578
6579 Ok((expr, scope))
6580 }
6581 ResolvedItemName::Cte { id, name } => {
6582 let name = name.into();
6583 let cte = self.ctes.get(&id).unwrap();
6584 let expr = HirRelationExpr::Get {
6585 id: Id::Local(id),
6586 typ: cte.desc.typ().clone(),
6587 };
6588
6589 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6590
6591 Ok((expr, scope))
6592 }
6593 ResolvedItemName::ContinualTask { id, name } => {
6594 let cte = self.ctes.get(&id).unwrap();
6595 let expr = HirRelationExpr::Get {
6596 id: Id::Local(id),
6597 typ: cte.desc.typ().clone(),
6598 };
6599
6600 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6601
6602 Ok((expr, scope))
6603 }
6604 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6605 }
6606 }
6607
6608 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6611 self.scx.humanize_scalar_type(typ, postgres_compat)
6612 }
6613}
6614
6615#[derive(Debug, Clone)]
6617pub struct ExprContext<'a> {
6618 pub qcx: &'a QueryContext<'a>,
6619 pub name: &'a str,
6621 pub scope: &'a Scope,
6624 pub relation_type: &'a SqlRelationType,
6627 pub allow_aggregates: bool,
6629 pub allow_subqueries: bool,
6631 pub allow_parameters: bool,
6633 pub allow_windows: bool,
6635}
6636
6637impl CheckedRecursion for ExprContext<'_> {
6638 fn recursion_guard(&self) -> &RecursionGuard {
6639 &self.qcx.recursion_guard
6640 }
6641}
6642
6643impl<'a> ExprContext<'a> {
6644 pub fn catalog(&self) -> &dyn SessionCatalog {
6645 self.qcx.scx.catalog
6646 }
6647
6648 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6649 let mut ecx = self.clone();
6650 ecx.name = name;
6651 ecx
6652 }
6653
6654 pub fn column_type<E>(&self, expr: &E) -> E::Type
6655 where
6656 E: AbstractExpr,
6657 {
6658 expr.typ(
6659 &self.qcx.outer_relation_types,
6660 self.relation_type,
6661 &self.qcx.scx.param_types.borrow(),
6662 )
6663 }
6664
6665 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6666 where
6667 E: AbstractExpr,
6668 {
6669 self.column_type(expr).scalar_type()
6670 }
6671
6672 fn derived_query_context(&self) -> QueryContext<'_> {
6673 let mut scope = self.scope.clone();
6674 scope.lateral_barrier = true;
6675 self.qcx.derived_context(scope, self.relation_type.clone())
6676 }
6677
6678 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6679 self.qcx.scx.require_feature_flag(flag)
6680 }
6681
6682 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6683 &self.qcx.scx.param_types
6684 }
6685
6686 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6689 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6690 }
6691
6692 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6693 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6694 }
6695}
6696
6697#[derive(Debug, Clone)]
6703pub struct NameManager(BTreeSet<Arc<str>>);
6704
6705impl NameManager {
6706 pub fn new() -> Self {
6708 Self(BTreeSet::new())
6709 }
6710
6711 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6714 let s = s.as_ref();
6715 if let Some(interned) = self.0.get(s) {
6716 Arc::clone(interned)
6717 } else {
6718 let interned: Arc<str> = Arc::from(s);
6719 self.0.insert(Arc::clone(&interned));
6720 interned
6721 }
6722 }
6723
6724 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6727 self.intern(item.column_name.as_str())
6743 }
6744}
6745
6746#[cfg(test)]
6747mod test {
6748 use super::*;
6749
6750 #[mz_ore::test]
6755 pub fn test_name_manager_string_interning() {
6756 let mut nm = NameManager::new();
6757
6758 let orig_hi = "hi";
6759 let hi = nm.intern(orig_hi);
6760 let hello = nm.intern("hello");
6761
6762 assert_ne!(hi.as_ptr(), hello.as_ptr());
6763
6764 let hi2 = nm.intern("hi");
6766 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6767
6768 let s = format!(
6770 "{}{}",
6771 hi.chars().nth(0).unwrap(),
6772 hi2.chars().nth(1).unwrap()
6773 );
6774 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6776
6777 let hi3 = nm.intern(s);
6778 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6779 }
6780}