1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52 func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67 RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103 literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111 pub expr: E,
112 pub desc: RelationDesc,
113 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114 pub scope: Scope,
115}
116
117#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127 scx: &StatementContext,
128 mut query: Query<Aug>,
129 lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131 transform_ast::transform(scx, &mut query)?;
132 let mut qcx = QueryContext::root(scx, lifetime);
133 let PlannedQuery {
134 mut expr,
135 scope,
136 order_by,
137 limit,
138 offset,
139 project,
140 group_size_hints,
141 } = plan_query(&mut qcx, &query)?;
142
143 let mut finishing = RowSetFinishing {
144 limit,
145 offset,
146 project,
147 order_by,
148 };
149
150 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157 if lifetime.is_maintained() {
158 expr.finish_maintained(&mut finishing, group_size_hints);
159 }
160
161 let typ = qcx.relation_type(&expr);
162 let typ = SqlRelationType::new(
163 finishing
164 .project
165 .iter()
166 .map(|i| typ.column_types[*i].clone())
167 .collect(),
168 );
169 let desc = RelationDesc::new(typ, scope.column_names());
170
171 Ok(PlannedRootQuery {
172 expr,
173 desc,
174 finishing,
175 scope,
176 })
177}
178
179#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182 qcx: &mut QueryContext,
183 mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185 transform_ast::transform(qcx.scx, &mut query)?;
186 let PlannedQuery {
187 mut expr,
188 scope,
189 order_by,
190 limit,
191 offset,
192 project,
193 group_size_hints,
194 } = plan_query(qcx, &query)?;
195
196 let mut finishing = RowSetFinishing {
197 limit,
198 offset,
199 project,
200 order_by,
201 };
202
203 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210 expr.finish_maintained(&mut finishing, group_size_hints);
211
212 let typ = qcx.relation_type(&expr);
213 let typ = SqlRelationType::new(
214 finishing
215 .project
216 .iter()
217 .map(|i| typ.column_types[*i].clone())
218 .collect(),
219 );
220 let desc = RelationDesc::new(typ, scope.column_names());
221
222 Ok(PlannedRootQuery {
223 expr,
224 desc,
225 finishing,
226 scope,
227 })
228}
229
230fn try_push_projection_order_by(
241 expr: &mut HirRelationExpr,
242 project: &mut Vec<usize>,
243 order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245 let mut unproject = vec![None; expr.arity()];
246 for (out_i, in_i) in project.iter().copied().enumerate() {
247 unproject[in_i] = Some(out_i);
248 }
249 if order_by
250 .iter()
251 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252 {
253 let trivial_project = (0..project.len()).collect();
254 *expr = expr.take().project(mem::replace(project, trivial_project));
255 for ob in order_by {
256 ob.column = unproject[ob.column].unwrap();
257 }
258 true
259 } else {
260 false
261 }
262}
263
264pub fn plan_insert_query(
265 scx: &StatementContext,
266 table_name: ResolvedItemName,
267 columns: Vec<Ident>,
268 source: InsertSource<Aug>,
269 returning: Vec<SelectItem<Aug>>,
270) -> Result<
271 (
272 CatalogItemId,
273 HirRelationExpr,
274 PlannedRootQuery<Vec<HirScalarExpr>>,
275 ),
276 PlanError,
277> {
278 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279 let table = scx.get_item_by_resolved_name(&table_name)?;
280
281 if table.item_type() != CatalogItemType::Table {
283 sql_bail!(
284 "cannot insert into {} '{}'",
285 table.item_type(),
286 table_name.full_name_str()
287 );
288 }
289 let desc = table.relation_desc().expect("table has desc");
290 let mut defaults = table
291 .writable_table_details()
292 .ok_or_else(|| {
293 sql_err!(
294 "cannot insert into non-writeable table '{}'",
295 table_name.full_name_str()
296 )
297 })?
298 .to_vec();
299
300 for default in &mut defaults {
301 transform_ast::transform(scx, default)?;
302 }
303
304 if table.id().is_system() {
305 sql_bail!(
306 "cannot insert into system table '{}'",
307 table_name.full_name_str()
308 );
309 }
310
311 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313 let mut source_types = Vec::with_capacity(columns.len());
315 let mut ordering = Vec::with_capacity(columns.len());
316
317 if columns.is_empty() {
318 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321 ordering.extend(0..desc.arity());
322 } else {
323 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324 .iter()
325 .enumerate()
326 .map(|(idx, (name, typ))| (name, (idx, typ)))
327 .collect();
328
329 for c in &columns {
330 if let Some((idx, typ)) = column_by_name.get(c) {
331 ordering.push(*idx);
332 source_types.push(&typ.scalar_type);
333 } else {
334 sql_bail!(
335 "column {} of relation {} does not exist",
336 c.quoted(),
337 table_name.full_name_str().quoted()
338 );
339 }
340 }
341 if let Some(dup) = columns.iter().duplicates().next() {
342 sql_bail!("column {} specified more than once", dup.quoted());
343 }
344 };
345
346 let expr = match source {
348 InsertSource::Query(mut query) => {
349 transform_ast::transform(scx, &mut query)?;
350
351 match query {
352 Query {
354 body: SetExpr::Values(Values(values)),
355 ctes,
356 order_by,
357 limit: None,
358 offset: None,
359 } if ctes.is_empty() && order_by.is_empty() => {
360 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361 plan_values_insert(&qcx, &names, &source_types, &values)?
362 }
363 _ => {
364 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365 expr
366 }
367 }
368 }
369 InsertSource::DefaultValues => {
370 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371 }
372 };
373
374 let expr_arity = expr.arity();
375
376 let max_columns = if columns.is_empty() {
379 desc.arity()
380 } else {
381 columns.len()
382 };
383 if expr_arity > max_columns {
384 sql_bail!("INSERT has more expressions than target columns");
385 }
386 if expr_arity < columns.len() {
388 sql_bail!("INSERT has more target columns than expressions");
389 }
390
391 source_types.truncate(expr_arity);
393 ordering.truncate(expr_arity);
394
395 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398 sql_err!(
399 "column {} is of type {} but expression is of type {}",
400 desc.get_name(ordering[e.column]).quoted(),
401 qcx.humanize_scalar_type(&e.target_type, false),
402 qcx.humanize_scalar_type(&e.source_type, false),
403 )
404 })?;
405
406 let mut map_exprs = vec![];
408 let mut project_key = Vec::with_capacity(desc.arity());
409
410 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414 for (col_idx, (col_typ, default)) in column_details {
415 if let Some(src_idx) = col_to_source.get(&col_idx) {
416 project_key.push(*src_idx);
417 } else {
418 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419 project_key.push(expr_arity + map_exprs.len());
420 map_exprs.push(hir);
421 }
422 }
423
424 let returning = {
425 let (scope, typ) = if let ResolvedItemName::Item {
426 full_name,
427 version: _,
428 ..
429 } = table_name
430 {
431 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432 let typ = desc.typ().clone();
433 (scope, typ)
434 } else {
435 (Scope::empty(), SqlRelationType::empty())
436 };
437 let ecx = &ExprContext {
438 qcx: &qcx,
439 name: "RETURNING clause",
440 scope: &scope,
441 relation_type: &typ,
442 allow_aggregates: false,
443 allow_subqueries: false,
444 allow_parameters: true,
445 allow_windows: false,
446 };
447 let table_func_names = BTreeMap::new();
448 let mut output_columns = vec![];
449 let mut new_exprs = vec![];
450 let mut new_type = SqlRelationType::empty();
451 for mut si in returning {
452 transform_ast::transform(scx, &mut si)?;
453 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454 let expr = match &select_item {
455 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457 };
458 output_columns.push(column_name);
459 let typ = ecx.column_type(&expr);
460 new_type.column_types.push(typ);
461 new_exprs.push(expr);
462 }
463 }
464 let desc = RelationDesc::new(new_type, output_columns);
465 let desc_arity = desc.arity();
466 PlannedRootQuery {
467 expr: new_exprs,
468 desc,
469 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = match item.relation_desc() {
507 Some(desc) => desc.into_owned(),
508 None => {
509 return Err(PlanError::InvalidDependency {
510 name: fullname.to_string(),
511 item_type: item.item_type(),
512 });
513 }
514 };
515 let mut ordering = Vec::with_capacity(columns.len());
516
517 let mfp = if let Some(table_defaults) = item.writable_table_details() {
527 let mut table_defaults = table_defaults.to_vec();
528
529 for default in &mut table_defaults {
530 transform_ast::transform(scx, default)?;
531 }
532
533 let source_column_names: Vec<_> = columns
535 .iter()
536 .cloned()
537 .map(normalize::column_name)
538 .collect();
539
540 let mut default_exprs = Vec::new();
541 let mut project_keys = Vec::with_capacity(table_desc.arity());
542
543 let column_details = table_desc.iter().zip_eq(table_defaults);
546 for ((col_name, col_type), col_default) in column_details {
547 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
548 if let Some(src_idx) = maybe_src_idx {
549 project_keys.push(src_idx);
550 } else {
551 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
554 let mir = hir.lower_uncorrelated()?;
555 project_keys.push(source_column_names.len() + default_exprs.len());
556 default_exprs.push(mir);
557 }
558 }
559
560 let mfp = MapFilterProject::new(source_column_names.len())
561 .map(default_exprs)
562 .project(project_keys);
563 Some(mfp)
564 } else {
565 None
566 };
567
568 let source_desc = if columns.is_empty() {
570 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
571 ordering.extend(indexes);
572
573 table_desc
575 } else {
576 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
577 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
578 .iter_all()
579 .map(|(idx, name, typ)| (name, (*idx, typ)))
580 .collect();
581
582 let mut names = Vec::with_capacity(columns.len());
583 let mut source_types = Vec::with_capacity(columns.len());
584
585 for c in &columns {
586 if let Some((idx, typ)) = column_by_name.get(c) {
587 ordering.push(*idx);
588 source_types.push((*typ).clone());
589 names.push(c.clone());
590 } else {
591 sql_bail!(
592 "column {} of relation {} does not exist",
593 c.quoted(),
594 item_name.full_name_str().quoted()
595 );
596 }
597 }
598 if let Some(dup) = columns.iter().duplicates().next() {
599 sql_bail!("column {} specified more than once", dup.quoted());
600 }
601
602 RelationDesc::new(SqlRelationType::new(source_types), names)
604 };
605
606 Ok((item.id(), source_desc, ordering, mfp))
607}
608
609pub fn plan_copy_from(
613 scx: &StatementContext,
614 table_name: ResolvedItemName,
615 columns: Vec<Ident>,
616) -> Result<
617 (
618 CatalogItemId,
619 RelationDesc,
620 Vec<ColumnIndex>,
621 Option<MapFilterProject>,
622 ),
623 PlanError,
624> {
625 let table = scx.get_item_by_resolved_name(&table_name)?;
626
627 if table.item_type() != CatalogItemType::Table {
629 sql_bail!(
630 "cannot insert into {} '{}'",
631 table.item_type(),
632 table_name.full_name_str()
633 );
634 }
635
636 let _ = table.writable_table_details().ok_or_else(|| {
637 sql_err!(
638 "cannot insert into non-writeable table '{}'",
639 table_name.full_name_str()
640 )
641 })?;
642
643 if table.id().is_system() {
644 sql_bail!(
645 "cannot insert into system table '{}'",
646 table_name.full_name_str()
647 );
648 }
649 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
650
651 Ok((id, desc, ordering, mfp))
652}
653
654pub fn plan_copy_from_rows(
657 pcx: &PlanContext,
658 catalog: &dyn SessionCatalog,
659 target_id: CatalogItemId,
660 target_name: String,
661 columns: Vec<ColumnIndex>,
662 rows: Vec<mz_repr::Row>,
663) -> Result<HirRelationExpr, PlanError> {
664 let scx = StatementContext::new(Some(pcx), catalog);
665
666 let table = catalog
668 .try_get_item(&target_id)
669 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
670 .at_version(RelationVersionSelector::Latest);
671
672 let mut defaults = table
673 .writable_table_details()
674 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
675 .to_vec();
676
677 for default in &mut defaults {
678 transform_ast::transform(&scx, default)?;
679 }
680
681 let desc = table.relation_desc().expect("table has desc");
682 let column_types = columns
683 .iter()
684 .map(|x| desc.get_type(x).clone())
685 .map(|mut x| {
686 x.nullable = true;
689 x
690 })
691 .collect();
692 let typ = SqlRelationType::new(column_types);
693 let expr = HirRelationExpr::Constant {
694 rows,
695 typ: typ.clone(),
696 };
697
698 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
704 if columns == default {
705 return Ok(expr);
706 }
707
708 let mut map_exprs = vec![];
710 let mut project_key = Vec::with_capacity(desc.arity());
711
712 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
714
715 let column_details = desc.iter_all().zip_eq(defaults);
716 for ((col_idx, _col_name, col_typ), default) in column_details {
717 if let Some(src_idx) = col_to_source.get(&col_idx) {
718 project_key.push(*src_idx);
719 } else {
720 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
721 project_key.push(typ.arity() + map_exprs.len());
722 map_exprs.push(hir);
723 }
724 }
725
726 Ok(expr.map(map_exprs).project(project_key))
727}
728
729pub struct ReadThenWritePlan {
731 pub id: CatalogItemId,
732 pub selection: HirRelationExpr,
737 pub assignments: BTreeMap<usize, HirScalarExpr>,
739 pub finishing: RowSetFinishing,
740}
741
742pub fn plan_delete_query(
743 scx: &StatementContext,
744 mut delete_stmt: DeleteStatement<Aug>,
745) -> Result<ReadThenWritePlan, PlanError> {
746 transform_ast::transform(scx, &mut delete_stmt)?;
747
748 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
749 plan_mutation_query_inner(
750 qcx,
751 delete_stmt.table_name,
752 delete_stmt.alias,
753 delete_stmt.using,
754 vec![],
755 delete_stmt.selection,
756 )
757}
758
759pub fn plan_update_query(
760 scx: &StatementContext,
761 mut update_stmt: UpdateStatement<Aug>,
762) -> Result<ReadThenWritePlan, PlanError> {
763 transform_ast::transform(scx, &mut update_stmt)?;
764
765 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
766
767 plan_mutation_query_inner(
768 qcx,
769 update_stmt.table_name,
770 update_stmt.alias,
771 vec![],
772 update_stmt.assignments,
773 update_stmt.selection,
774 )
775}
776
777pub fn plan_mutation_query_inner(
778 qcx: QueryContext,
779 table_name: ResolvedItemName,
780 alias: Option<TableAlias>,
781 using: Vec<TableWithJoins<Aug>>,
782 assignments: Vec<Assignment<Aug>>,
783 selection: Option<Expr<Aug>>,
784) -> Result<ReadThenWritePlan, PlanError> {
785 let (id, version) = match table_name {
787 ResolvedItemName::Item { id, version, .. } => (id, version),
788 _ => sql_bail!("cannot mutate non-user table"),
789 };
790
791 let item = qcx.scx.get_item(&id).at_version(version);
793 if item.item_type() != CatalogItemType::Table {
794 sql_bail!(
795 "cannot mutate {} '{}'",
796 item.item_type(),
797 table_name.full_name_str()
798 );
799 }
800 let _ = item.writable_table_details().ok_or_else(|| {
801 sql_err!(
802 "cannot mutate non-writeable table '{}'",
803 table_name.full_name_str()
804 )
805 })?;
806 if id.is_system() {
807 sql_bail!(
808 "cannot mutate system table '{}'",
809 table_name.full_name_str()
810 );
811 }
812
813 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
815 let scope = plan_table_alias(scope, alias.as_ref())?;
816 let desc = item.relation_desc().expect("table has desc");
817 let relation_type = qcx.relation_type(&get);
818
819 if using.is_empty() {
820 if let Some(expr) = selection {
821 let ecx = &ExprContext {
822 qcx: &qcx,
823 name: "WHERE clause",
824 scope: &scope,
825 relation_type: &relation_type,
826 allow_aggregates: false,
827 allow_subqueries: true,
828 allow_parameters: true,
829 allow_windows: false,
830 };
831 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
832 get = get.filter(vec![expr]);
833 }
834 } else {
835 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
836 }
837
838 let mut sets = BTreeMap::new();
839 for Assignment { id, value } in assignments {
840 let name = normalize::column_name(id);
842 match desc.get_by_name(&name) {
843 Some((idx, typ)) => {
844 let ecx = &ExprContext {
845 qcx: &qcx,
846 name: "SET clause",
847 scope: &scope,
848 relation_type: &relation_type,
849 allow_aggregates: false,
850 allow_subqueries: false,
851 allow_parameters: true,
852 allow_windows: false,
853 };
854 let expr = plan_expr(ecx, &value)?.cast_to(
855 ecx,
856 CastContext::Assignment,
857 &typ.scalar_type,
858 )?;
859
860 if sets.insert(idx, expr).is_some() {
861 sql_bail!("column {} set twice", name)
862 }
863 }
864 None => sql_bail!("unknown column {}", name),
865 };
866 }
867
868 let finishing = RowSetFinishing {
869 order_by: vec![],
870 limit: None,
871 offset: 0,
872 project: (0..desc.arity()).collect(),
873 };
874
875 Ok(ReadThenWritePlan {
876 id,
877 selection: get,
878 finishing,
879 assignments: sets,
880 })
881}
882
883fn handle_mutation_using_clause(
895 qcx: &QueryContext,
896 selection: Option<Expr<Aug>>,
897 using: Vec<TableWithJoins<Aug>>,
898 get: HirRelationExpr,
899 outer_scope: Scope,
900) -> Result<HirRelationExpr, PlanError> {
901 let (mut using_rel_expr, using_scope) =
905 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
906 let (left, left_scope) = l;
907 plan_join(
908 qcx,
909 left,
910 left_scope,
911 &Join {
912 relation: TableFactor::NestedJoin {
913 join: Box::new(twj),
914 alias: None,
915 },
916 join_operator: JoinOperator::CrossJoin,
917 },
918 )
919 })?;
920
921 if let Some(expr) = selection {
922 let on = HirScalarExpr::literal_true();
928 let joined = using_rel_expr
929 .clone()
930 .join(get.clone(), on, JoinKind::Inner);
931 let joined_scope = using_scope.product(outer_scope)?;
932 let joined_relation_type = qcx.relation_type(&joined);
933
934 let ecx = &ExprContext {
935 qcx,
936 name: "WHERE clause",
937 scope: &joined_scope,
938 relation_type: &joined_relation_type,
939 allow_aggregates: false,
940 allow_subqueries: true,
941 allow_parameters: true,
942 allow_windows: false,
943 };
944
945 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
947
948 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
952 use mz_expr::visit::Visit;
954 expr.visit_mut_post(&mut |e| {
955 if let HirScalarExpr::Column(c, _name) = e {
956 if c.column >= using_rel_arity {
957 c.level += 1;
958 c.column -= using_rel_arity;
959 };
960 }
961 })?;
962
963 using_rel_expr = using_rel_expr.filter(vec![expr]);
967 } else {
968 let _joined_scope = using_scope.product(outer_scope)?;
971 }
972 Ok(get.filter(vec![using_rel_expr.exists()]))
983}
984
985#[derive(Debug)]
986pub(crate) struct CastRelationError {
987 pub(crate) column: usize,
988 pub(crate) source_type: SqlScalarType,
989 pub(crate) target_type: SqlScalarType,
990}
991
992pub(crate) fn cast_relation<'a, I>(
996 qcx: &QueryContext,
997 ccx: CastContext,
998 expr: HirRelationExpr,
999 target_types: I,
1000) -> Result<HirRelationExpr, CastRelationError>
1001where
1002 I: IntoIterator<Item = &'a SqlScalarType>,
1003{
1004 let ecx = &ExprContext {
1005 qcx,
1006 name: "values",
1007 scope: &Scope::empty(),
1008 relation_type: &qcx.relation_type(&expr),
1009 allow_aggregates: false,
1010 allow_subqueries: true,
1011 allow_parameters: true,
1012 allow_windows: false,
1013 };
1014 let mut map_exprs = vec![];
1015 let mut project_key = vec![];
1016 for (i, target_typ) in target_types.into_iter().enumerate() {
1017 let expr = HirScalarExpr::column(i);
1018 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1022 Ok(cast_expr) => {
1023 if expr == cast_expr {
1024 project_key.push(i);
1026 } else {
1027 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1029 map_exprs.push(cast_expr);
1030 }
1031 }
1032 Err(_) => {
1033 return Err(CastRelationError {
1034 column: i,
1035 source_type: ecx.scalar_type(&expr),
1036 target_type: target_typ.clone(),
1037 });
1038 }
1039 }
1040 }
1041 Ok(expr.map(map_exprs).project(project_key))
1042}
1043
1044pub fn plan_as_of(
1047 scx: &StatementContext,
1048 as_of: Option<AsOf<Aug>>,
1049) -> Result<QueryWhen, PlanError> {
1050 match as_of {
1051 None => Ok(QueryWhen::Immediately),
1052 Some(as_of) => match as_of {
1053 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1054 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1055 },
1056 }
1057}
1058
1059pub fn plan_as_of_or_up_to(
1069 scx: &StatementContext,
1070 mut expr: Expr<Aug>,
1071) -> Result<mz_repr::Timestamp, PlanError> {
1072 let scope = Scope::empty();
1073 let desc = RelationDesc::empty();
1074 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1077 transform_ast::transform(scx, &mut expr)?;
1078 let ecx = &ExprContext {
1079 qcx: &qcx,
1080 name: "AS OF or UP TO",
1081 scope: &scope,
1082 relation_type: desc.typ(),
1083 allow_aggregates: false,
1084 allow_subqueries: false,
1085 allow_parameters: false,
1086 allow_windows: false,
1087 };
1088 let hir = plan_expr(ecx, &expr)?.cast_to(
1089 ecx,
1090 CastContext::Assignment,
1091 &SqlScalarType::MzTimestamp,
1092 )?;
1093 if hir.contains_unmaterializable() {
1094 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1095 }
1096 let timestamp = hir
1103 .into_literal_mz_timestamp()
1104 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1105 Ok(timestamp)
1106}
1107
1108pub fn plan_secret_as(
1110 scx: &StatementContext,
1111 mut expr: Expr<Aug>,
1112) -> Result<MirScalarExpr, PlanError> {
1113 let scope = Scope::empty();
1114 let desc = RelationDesc::empty();
1115 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1116
1117 transform_ast::transform(scx, &mut expr)?;
1118
1119 let ecx = &ExprContext {
1120 qcx: &qcx,
1121 name: "AS",
1122 scope: &scope,
1123 relation_type: desc.typ(),
1124 allow_aggregates: false,
1125 allow_subqueries: false,
1126 allow_parameters: false,
1127 allow_windows: false,
1128 };
1129 let expr = plan_expr(ecx, &expr)?
1130 .type_as(ecx, &SqlScalarType::Bytes)?
1131 .lower_uncorrelated()?;
1132 Ok(expr)
1133}
1134
1135pub fn plan_webhook_validate_using(
1137 scx: &StatementContext,
1138 validate_using: CreateWebhookSourceCheck<Aug>,
1139) -> Result<WebhookValidation, PlanError> {
1140 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1141
1142 let CreateWebhookSourceCheck {
1143 options,
1144 using: mut expr,
1145 } = validate_using;
1146
1147 let mut column_typs = vec![];
1148 let mut column_names = vec![];
1149
1150 let (bodies, headers, secrets) = options
1151 .map(|o| (o.bodies, o.headers, o.secrets))
1152 .unwrap_or_default();
1153
1154 let mut body_tuples = vec![];
1156 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1157 let scalar_type = use_bytes
1158 .then_some(SqlScalarType::Bytes)
1159 .unwrap_or(SqlScalarType::String);
1160 let name = alias
1161 .map(|a| a.into_string())
1162 .unwrap_or_else(|| "body".to_string());
1163
1164 column_typs.push(SqlColumnType {
1165 scalar_type,
1166 nullable: false,
1167 });
1168 column_names.push(name);
1169
1170 let column_idx = column_typs.len() - 1;
1172 assert_eq!(
1174 column_idx,
1175 column_names.len() - 1,
1176 "body column names and types don't match"
1177 );
1178 body_tuples.push((column_idx, use_bytes));
1179 }
1180
1181 let mut header_tuples = vec![];
1183
1184 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1185 let value_type = use_bytes
1186 .then_some(SqlScalarType::Bytes)
1187 .unwrap_or(SqlScalarType::String);
1188 let name = alias
1189 .map(|a| a.into_string())
1190 .unwrap_or_else(|| "headers".to_string());
1191
1192 column_typs.push(SqlColumnType {
1193 scalar_type: SqlScalarType::Map {
1194 value_type: Box::new(value_type),
1195 custom_id: None,
1196 },
1197 nullable: false,
1198 });
1199 column_names.push(name);
1200
1201 let column_idx = column_typs.len() - 1;
1203 assert_eq!(
1205 column_idx,
1206 column_names.len() - 1,
1207 "header column names and types don't match"
1208 );
1209 header_tuples.push((column_idx, use_bytes));
1210 }
1211
1212 let mut validation_secrets = vec![];
1214
1215 for CreateWebhookSourceSecret {
1216 secret,
1217 alias,
1218 use_bytes,
1219 } in secrets
1220 {
1221 let scalar_type = use_bytes
1223 .then_some(SqlScalarType::Bytes)
1224 .unwrap_or(SqlScalarType::String);
1225
1226 column_typs.push(SqlColumnType {
1227 scalar_type,
1228 nullable: false,
1229 });
1230 let ResolvedItemName::Item {
1231 id,
1232 full_name: FullItemName { item, .. },
1233 ..
1234 } = secret
1235 else {
1236 return Err(PlanError::InvalidSecret(Box::new(secret)));
1237 };
1238
1239 let name = if let Some(alias) = alias {
1241 alias.into_string()
1242 } else {
1243 item
1244 };
1245 column_names.push(name);
1246
1247 let column_idx = column_typs.len() - 1;
1250 assert_eq!(
1252 column_idx,
1253 column_names.len() - 1,
1254 "column names and types don't match"
1255 );
1256
1257 validation_secrets.push(WebhookValidationSecret {
1258 id,
1259 column_idx,
1260 use_bytes,
1261 });
1262 }
1263
1264 let relation_typ = SqlRelationType::new(column_typs);
1265 let desc = RelationDesc::new(relation_typ, column_names.clone());
1266 let scope = Scope::from_source(None, column_names);
1267
1268 transform_ast::transform(scx, &mut expr)?;
1269
1270 let ecx = &ExprContext {
1271 qcx: &qcx,
1272 name: "CHECK",
1273 scope: &scope,
1274 relation_type: desc.typ(),
1275 allow_aggregates: false,
1276 allow_subqueries: false,
1277 allow_parameters: false,
1278 allow_windows: false,
1279 };
1280 let expr = plan_expr(ecx, &expr)?
1281 .type_as(ecx, &SqlScalarType::Bool)?
1282 .lower_uncorrelated()?;
1283 let validation = WebhookValidation {
1284 expression: expr,
1285 relation_desc: desc,
1286 bodies: body_tuples,
1287 headers: header_tuples,
1288 secrets: validation_secrets,
1289 };
1290 Ok(validation)
1291}
1292
1293pub fn plan_default_expr(
1294 scx: &StatementContext,
1295 expr: &Expr<Aug>,
1296 target_ty: &SqlScalarType,
1297) -> Result<HirScalarExpr, PlanError> {
1298 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1299 let ecx = &ExprContext {
1300 qcx: &qcx,
1301 name: "DEFAULT expression",
1302 scope: &Scope::empty(),
1303 relation_type: &SqlRelationType::empty(),
1304 allow_aggregates: false,
1305 allow_subqueries: false,
1306 allow_parameters: false,
1307 allow_windows: false,
1308 };
1309 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1310 Ok(hir)
1311}
1312
1313pub fn plan_params<'a>(
1314 scx: &'a StatementContext,
1315 params: Vec<Expr<Aug>>,
1316 desc: &StatementDesc,
1317) -> Result<Params, PlanError> {
1318 if params.len() != desc.param_types.len() {
1319 sql_bail!(
1320 "expected {} params, got {}",
1321 desc.param_types.len(),
1322 params.len()
1323 );
1324 }
1325
1326 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1327
1328 let mut datums = Row::default();
1329 let mut packer = datums.packer();
1330 let mut actual_types = Vec::new();
1331 let temp_storage = &RowArena::new();
1332 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1333 transform_ast::transform(scx, &mut expr)?;
1334
1335 let ecx = execute_expr_context(&qcx);
1336 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1337 let actual_ty = ecx.scalar_type(&ex);
1338 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1339 return Err(PlanError::WrongParameterType(
1340 i + 1,
1341 ecx.humanize_scalar_type(expected_ty, false),
1342 ecx.humanize_scalar_type(&actual_ty, false),
1343 ));
1344 }
1345 let ex = ex.lower_uncorrelated()?;
1346 let evaled = ex.eval(&[], temp_storage)?;
1347 packer.push(evaled);
1348 actual_types.push(actual_ty);
1349 }
1350 Ok(Params {
1351 datums,
1352 execute_types: actual_types,
1353 expected_types: desc.param_types.clone(),
1354 })
1355}
1356
1357static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1358static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1359
1360pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1362 ExprContext {
1363 qcx,
1364 name: "EXECUTE",
1365 scope: &EXECUTE_CONTEXT_SCOPE,
1366 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1367 allow_aggregates: false,
1368 allow_subqueries: false,
1369 allow_parameters: false,
1370 allow_windows: false,
1371 }
1372}
1373
1374pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1379 LazyLock::new(|| CastContext::Assignment);
1380
1381pub fn plan_index_exprs<'a>(
1382 scx: &'a StatementContext,
1383 on_desc: &RelationDesc,
1384 exprs: Vec<Expr<Aug>>,
1385) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1386 let scope = Scope::from_source(None, on_desc.iter_names());
1387 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1388
1389 let ecx = &ExprContext {
1390 qcx: &qcx,
1391 name: "CREATE INDEX",
1392 scope: &scope,
1393 relation_type: on_desc.typ(),
1394 allow_aggregates: false,
1395 allow_subqueries: false,
1396 allow_parameters: false,
1397 allow_windows: false,
1398 };
1399 let mut out = vec![];
1400 for mut expr in exprs {
1401 transform_ast::transform(scx, &mut expr)?;
1402 let expr = plan_expr_or_col_index(ecx, &expr)?;
1403 let mut expr = expr.lower_uncorrelated()?;
1404 expr.reduce(&on_desc.typ().column_types);
1405 out.push(expr);
1406 }
1407 Ok(out)
1408}
1409
1410fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1411 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1412 Some(column) => Ok(HirScalarExpr::column(column)),
1413 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1414 }
1415}
1416
1417fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1418 match e {
1419 Expr::Value(Value::Number(n)) => {
1420 let n = n.parse::<usize>().map_err(|e| {
1421 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1422 })?;
1423 if n < 1 || n > max {
1424 sql_bail!(
1425 "column reference {} in {} is out of range (1 - {})",
1426 n,
1427 name,
1428 max
1429 );
1430 }
1431 Ok(Some(n - 1))
1432 }
1433 _ => Ok(None),
1434 }
1435}
1436
1437struct PlannedQuery {
1438 expr: HirRelationExpr,
1439 scope: Scope,
1440 order_by: Vec<ColumnOrder>,
1441 limit: Option<HirScalarExpr>,
1442 offset: HirScalarExpr,
1448 project: Vec<usize>,
1449 group_size_hints: GroupSizeHints,
1450}
1451
1452fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1453 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1454}
1455
1456fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1457 let cte_bindings = plan_ctes(qcx, q)?;
1460
1461 let limit = match &q.limit {
1462 None => None,
1463 Some(Limit {
1464 quantity,
1465 with_ties: false,
1466 }) => {
1467 let ecx = &ExprContext {
1468 qcx,
1469 name: "LIMIT",
1470 scope: &Scope::empty(),
1471 relation_type: &SqlRelationType::empty(),
1472 allow_aggregates: false,
1473 allow_subqueries: true,
1474 allow_parameters: true,
1475 allow_windows: false,
1476 };
1477 let limit = plan_expr(ecx, quantity)?;
1478 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1479
1480 let limit = if limit.is_constant() {
1481 let arena = RowArena::new();
1482 let limit = limit.lower_uncorrelated()?;
1483
1484 match limit.eval(&[], &arena)? {
1488 d @ Datum::Int64(v) if v >= 0 => {
1489 HirScalarExpr::literal(d, SqlScalarType::Int64)
1490 }
1491 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1492 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1493 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1494 }
1495 } else {
1496 qcx.scx
1498 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1499 limit
1500 };
1501
1502 Some(limit)
1503 }
1504 Some(Limit {
1505 quantity: _,
1506 with_ties: true,
1507 }) => bail_unsupported!("FETCH ... WITH TIES"),
1508 };
1509
1510 let offset = match &q.offset {
1511 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1512 Some(offset) => {
1513 let ecx = &ExprContext {
1514 qcx,
1515 name: "OFFSET",
1516 scope: &Scope::empty(),
1517 relation_type: &SqlRelationType::empty(),
1518 allow_aggregates: false,
1519 allow_subqueries: false,
1520 allow_parameters: true,
1521 allow_windows: false,
1522 };
1523 let offset = plan_expr(ecx, offset)?;
1524 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1525
1526 let offset = if offset.is_constant() {
1527 let offset_value = offset_into_value(offset)?;
1529 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1530 } else {
1531 if !offset.contains_parameters() {
1535 return Err(PlanError::InvalidOffset(format!(
1536 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1537 offset
1538 )));
1539 }
1540 offset
1541 };
1542 offset
1543 }
1544 };
1545
1546 let mut planned_query = match &q.body {
1547 SetExpr::Select(s) => {
1548 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1550 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1551
1552 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1553 PlannedQuery {
1554 expr: plan.expr,
1555 scope: plan.scope,
1556 order_by: plan.order_by,
1557 project: plan.project,
1558 limit,
1559 offset,
1560 group_size_hints,
1561 }
1562 }
1563 _ => {
1564 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1565 let ecx = &ExprContext {
1566 qcx,
1567 name: "ORDER BY clause of a set expression",
1568 scope: &scope,
1569 relation_type: &qcx.relation_type(&expr),
1570 allow_aggregates: false,
1571 allow_subqueries: true,
1572 allow_parameters: true,
1573 allow_windows: false,
1574 };
1575 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1576 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1577 let project = (0..ecx.relation_type.arity()).collect();
1578 PlannedQuery {
1579 expr: expr.map(map_exprs),
1580 scope,
1581 order_by,
1582 limit,
1583 project,
1584 offset,
1585 group_size_hints: GroupSizeHints::default(),
1586 }
1587 }
1588 };
1589
1590 match &q.ctes {
1592 CteBlock::Simple(_) => {
1593 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1594 if let Some(cte) = qcx.ctes.remove(&id) {
1595 planned_query.expr = HirRelationExpr::Let {
1596 name: cte.name,
1597 id: id.clone(),
1598 value: Box::new(value),
1599 body: Box::new(planned_query.expr),
1600 };
1601 }
1602 if let Some(shadowed_val) = shadowed_val {
1603 qcx.ctes.insert(id, shadowed_val);
1604 }
1605 }
1606 }
1607 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1608 let MutRecBlockOptionExtracted {
1609 recursion_limit,
1610 return_at_recursion_limit,
1611 error_at_recursion_limit,
1612 seen: _,
1613 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1614 let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1615 (None, None, None) => None,
1616 (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1617 (None, Some(max_iters), None) => Some((max_iters, true)),
1618 (None, None, Some(max_iters)) => Some((max_iters, false)),
1619 _ => {
1620 return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1621 }
1622 }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1623 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1624 return_at_limit: *return_at_limit,
1625 }))?;
1626
1627 let mut bindings = Vec::new();
1628 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1629 if let Some(cte) = qcx.ctes.remove(&id) {
1630 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1631 }
1632 if let Some(shadowed_val) = shadowed_val {
1633 qcx.ctes.insert(id, shadowed_val);
1634 }
1635 }
1636 if !bindings.is_empty() {
1637 planned_query.expr = HirRelationExpr::LetRec {
1638 limit,
1639 bindings,
1640 body: Box::new(planned_query.expr),
1641 }
1642 }
1643 }
1644 }
1645
1646 Ok(planned_query)
1647}
1648
1649pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1651 let offset = offset
1652 .try_into_literal_int64()
1653 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1654 if offset < 0 {
1655 return Err(PlanError::InvalidOffset(format!(
1656 "must not be negative, got {}",
1657 offset
1658 )));
1659 }
1660 Ok(offset)
1661}
1662
1663generate_extracted_config!(
1664 MutRecBlockOption,
1665 (RecursionLimit, u64),
1666 (ReturnAtRecursionLimit, u64),
1667 (ErrorAtRecursionLimit, u64)
1668);
1669
1670pub fn plan_ctes(
1675 qcx: &mut QueryContext,
1676 q: &Query<Aug>,
1677) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1678 let mut result = Vec::new();
1680 let mut shadowed_descs = BTreeMap::new();
1683
1684 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1686 sql_bail!(
1687 "WITH query name {} specified more than once",
1688 normalize::ident_ref(ident).quoted()
1689 )
1690 }
1691
1692 match &q.ctes {
1693 CteBlock::Simple(ctes) => {
1694 for cte in ctes.iter() {
1696 let cte_name = normalize::ident(cte.alias.name.clone());
1697 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1698 let typ = qcx.relation_type(&val);
1699 let mut desc = RelationDesc::new(typ, scope.column_names());
1700 plan_utils::maybe_rename_columns(
1701 format!("CTE {}", cte.alias.name),
1702 &mut desc,
1703 &cte.alias.columns,
1704 )?;
1705 let shadowed = qcx.ctes.insert(
1707 cte.id,
1708 CteDesc {
1709 name: cte_name,
1710 desc,
1711 },
1712 );
1713
1714 result.push((cte.id, val, shadowed));
1715 }
1716 }
1717 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1718 for cte in ctes.iter() {
1720 let cte_name = normalize::ident(cte.name.clone());
1721 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1722 for column in cte.columns.iter() {
1723 desc_columns.push((
1724 normalize::column_name(column.name.clone()),
1725 SqlColumnType {
1726 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1727 nullable: true,
1728 },
1729 ));
1730 }
1731 let desc = RelationDesc::from_names_and_types(desc_columns);
1732 let shadowed = qcx.ctes.insert(
1733 cte.id,
1734 CteDesc {
1735 name: cte_name,
1736 desc,
1737 },
1738 );
1739 if let Some(shadowed) = shadowed {
1741 shadowed_descs.insert(cte.id, shadowed);
1742 }
1743 }
1744
1745 for cte in ctes.iter() {
1747 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1748
1749 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1750
1751 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1752 sql_bail!(
1755 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1756 );
1757 }
1758
1759 if !proposed_typ.keys.is_empty() {
1760 sql_bail!("[internal error]: WMR CTEs do not support keys");
1763 }
1764
1765 let derived_typ = qcx.relation_type(&val);
1767
1768 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1769 let cte_name = normalize::ident(cte.name.clone());
1770 let proposed_typ = proposed_typ
1771 .column_types
1772 .iter()
1773 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1774 .collect::<Vec<_>>();
1775 let inferred_typ = derived_typ
1776 .column_types
1777 .iter()
1778 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1779 .collect::<Vec<_>>();
1780 Err(PlanError::RecursiveTypeMismatch(
1781 cte_name,
1782 proposed_typ,
1783 inferred_typ,
1784 ))
1785 };
1786
1787 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1788 return type_err(proposed_typ, derived_typ);
1789 }
1790
1791 let val = match cast_relation(
1793 qcx,
1794 CastContext::Assignment,
1799 val,
1800 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1801 ) {
1802 Ok(val) => val,
1803 Err(_) => return type_err(proposed_typ, derived_typ),
1804 };
1805
1806 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1807 }
1808 }
1809 }
1810
1811 Ok(result)
1812}
1813
1814pub fn plan_nested_query(
1815 qcx: &mut QueryContext,
1816 q: &Query<Aug>,
1817) -> Result<(HirRelationExpr, Scope), PlanError> {
1818 let PlannedQuery {
1819 mut expr,
1820 scope,
1821 order_by,
1822 limit,
1823 offset,
1824 project,
1825 group_size_hints,
1826 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1827 if limit.is_some()
1828 || !offset
1829 .clone()
1830 .try_into_literal_int64()
1831 .is_ok_and(|offset| offset == 0)
1832 {
1833 expr = HirRelationExpr::top_k(
1834 expr,
1835 vec![],
1836 order_by,
1837 limit,
1838 offset,
1839 group_size_hints.limit_input_group_size,
1840 );
1841 }
1842 Ok((expr.project(project), scope))
1843}
1844
1845fn plan_set_expr(
1846 qcx: &mut QueryContext,
1847 q: &SetExpr<Aug>,
1848) -> Result<(HirRelationExpr, Scope), PlanError> {
1849 match q {
1850 SetExpr::Select(select) => {
1851 let order_by_exprs = Vec::new();
1852 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1853 assert!(plan.order_by.is_empty());
1856 Ok((plan.expr.project(plan.project), plan.scope))
1857 }
1858 SetExpr::SetOperation {
1859 op,
1860 all,
1861 left,
1862 right,
1863 } => {
1864 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1866 let (right_expr, right_scope) =
1867 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1868
1869 let left_type = qcx.relation_type(&left_expr);
1871 let right_type = qcx.relation_type(&right_expr);
1872 if left_type.arity() != right_type.arity() {
1873 sql_bail!(
1874 "each {} query must have the same number of columns: {} vs {}",
1875 op,
1876 left_type.arity(),
1877 right_type.arity(),
1878 );
1879 }
1880
1881 let left_ecx = &ExprContext {
1886 qcx,
1887 name: &op.to_string(),
1888 scope: &left_scope,
1889 relation_type: &left_type,
1890 allow_aggregates: false,
1891 allow_subqueries: false,
1892 allow_parameters: false,
1893 allow_windows: false,
1894 };
1895 let right_ecx = &ExprContext {
1896 qcx,
1897 name: &op.to_string(),
1898 scope: &right_scope,
1899 relation_type: &right_type,
1900 allow_aggregates: false,
1901 allow_subqueries: false,
1902 allow_parameters: false,
1903 allow_windows: false,
1904 };
1905 let mut left_casts = vec![];
1906 let mut right_casts = vec![];
1907 for (i, (left_type, right_type)) in left_type
1908 .column_types
1909 .iter()
1910 .zip_eq(right_type.column_types.iter())
1911 .enumerate()
1912 {
1913 let types = &[
1914 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1915 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1916 ];
1917 let target =
1918 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1919 match typeconv::plan_cast(
1920 left_ecx,
1921 CastContext::Implicit,
1922 HirScalarExpr::column(i),
1923 &target,
1924 ) {
1925 Ok(expr) => left_casts.push(expr),
1926 Err(_) => sql_bail!(
1927 "{} types {} and {} cannot be matched",
1928 op,
1929 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1930 qcx.humanize_scalar_type(&target, false),
1931 ),
1932 }
1933 match typeconv::plan_cast(
1934 right_ecx,
1935 CastContext::Implicit,
1936 HirScalarExpr::column(i),
1937 &target,
1938 ) {
1939 Ok(expr) => right_casts.push(expr),
1940 Err(_) => sql_bail!(
1941 "{} types {} and {} cannot be matched",
1942 op,
1943 qcx.humanize_scalar_type(&target, false),
1944 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1945 ),
1946 }
1947 }
1948 let lhs = if left_casts
1949 .iter()
1950 .enumerate()
1951 .any(|(i, e)| e != &HirScalarExpr::column(i))
1952 {
1953 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1954 left_expr.map(left_casts).project(project_key)
1955 } else {
1956 left_expr
1957 };
1958 let rhs = if right_casts
1959 .iter()
1960 .enumerate()
1961 .any(|(i, e)| e != &HirScalarExpr::column(i))
1962 {
1963 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1964 right_expr.map(right_casts).project(project_key)
1965 } else {
1966 right_expr
1967 };
1968
1969 let relation_expr = match op {
1970 SetOperator::Union => {
1971 if *all {
1972 lhs.union(rhs)
1973 } else {
1974 lhs.union(rhs).distinct()
1975 }
1976 }
1977 SetOperator::Except => Hir::except(all, lhs, rhs),
1978 SetOperator::Intersect => {
1979 let left_clone = lhs.clone();
1985 if *all {
1986 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1987 } else {
1988 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1989 .distinct()
1990 }
1991 }
1992 };
1993 let scope = Scope::from_source(
1994 None,
1995 left_scope.column_names(),
1997 );
1998
1999 Ok((relation_expr, scope))
2000 }
2001 SetExpr::Values(Values(values)) => plan_values(qcx, values),
2002 SetExpr::Table(name) => {
2003 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2004 Ok((expr, scope))
2005 }
2006 SetExpr::Query(query) => {
2007 let (expr, scope) = plan_nested_query(qcx, query)?;
2008 Ok((expr, scope))
2009 }
2010 SetExpr::Show(stmt) => {
2011 if !qcx.lifetime.allow_show() {
2025 return Err(PlanError::ShowCommandInView);
2026 }
2027
2028 fn to_hirscope(
2031 plan: ShowCreatePlan,
2032 desc: StatementDesc,
2033 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2034 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2035 let desc = desc.relation_desc.expect("must exist");
2036 let scope = Scope::from_source(None, desc.iter_names());
2037 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2038 Ok((expr, scope))
2039 }
2040
2041 match stmt.clone() {
2042 ShowStatement::ShowColumns(stmt) => {
2043 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2044 }
2045 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2046 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_connection(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2050 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_cluster(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2054 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_index(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2058 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_sink(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2062 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_source(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2066 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2067 show::describe_show_create_table(qcx.scx, stmt)?,
2068 ),
2069 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2070 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2071 show::describe_show_create_view(qcx.scx, stmt)?,
2072 ),
2073 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2074 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2075 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2076 ),
2077 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2078 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2079 show::describe_show_create_type(qcx.scx, stmt)?,
2080 ),
2081 ShowStatement::ShowObjects(stmt) => {
2082 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2083 }
2084 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2085 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2086 }
2087 }
2088 }
2089}
2090
2091fn plan_values(
2093 qcx: &QueryContext,
2094 values: &[Vec<Expr<Aug>>],
2095) -> Result<(HirRelationExpr, Scope), PlanError> {
2096 assert!(!values.is_empty());
2097
2098 let ecx = &ExprContext {
2099 qcx,
2100 name: "VALUES",
2101 scope: &Scope::empty(),
2102 relation_type: &SqlRelationType::empty(),
2103 allow_aggregates: false,
2104 allow_subqueries: true,
2105 allow_parameters: true,
2106 allow_windows: false,
2107 };
2108
2109 let ncols = values[0].len();
2110 let nrows = values.len();
2111
2112 let mut cols = vec![vec![]; ncols];
2115 for row in values {
2116 if row.len() != ncols {
2117 sql_bail!(
2118 "VALUES expression has varying number of columns: {} vs {}",
2119 row.len(),
2120 ncols
2121 );
2122 }
2123 for (i, v) in row.iter().enumerate() {
2124 cols[i].push(v);
2125 }
2126 }
2127
2128 let mut col_iters = Vec::with_capacity(ncols);
2130 let mut col_types = Vec::with_capacity(ncols);
2131 for col in &cols {
2132 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2133 let mut col_type = ecx.column_type(&col[0]);
2134 for val in &col[1..] {
2135 col_type = col_type.union(&ecx.column_type(val))?;
2136 }
2137 col_types.push(col_type);
2138 col_iters.push(col.into_iter());
2139 }
2140
2141 let mut exprs = vec![];
2143 for _ in 0..nrows {
2144 for i in 0..ncols {
2145 exprs.push(col_iters[i].next().unwrap());
2146 }
2147 }
2148 let out = HirRelationExpr::CallTable {
2149 func: TableFunc::Wrap {
2150 width: ncols,
2151 types: col_types,
2152 },
2153 exprs,
2154 };
2155
2156 let mut scope = Scope::empty();
2158 for i in 0..ncols {
2159 let name = format!("column{}", i + 1);
2160 scope.items.push(ScopeItem::from_column_name(name));
2161 }
2162
2163 Ok((out, scope))
2164}
2165
2166fn plan_values_insert(
2176 qcx: &QueryContext,
2177 target_names: &[&ColumnName],
2178 target_types: &[&SqlScalarType],
2179 values: &[Vec<Expr<Aug>>],
2180) -> Result<HirRelationExpr, PlanError> {
2181 assert!(!values.is_empty());
2182
2183 if !values.iter().map(|row| row.len()).all_equal() {
2184 sql_bail!("VALUES lists must all be the same length");
2185 }
2186
2187 let ecx = &ExprContext {
2188 qcx,
2189 name: "VALUES",
2190 scope: &Scope::empty(),
2191 relation_type: &SqlRelationType::empty(),
2192 allow_aggregates: false,
2193 allow_subqueries: true,
2194 allow_parameters: true,
2195 allow_windows: false,
2196 };
2197
2198 let mut exprs = vec![];
2199 let mut types = vec![];
2200 for row in values {
2201 if row.len() > target_names.len() {
2202 sql_bail!("INSERT has more expressions than target columns");
2203 }
2204 for (column, val) in row.into_iter().enumerate() {
2205 let target_type = &target_types[column];
2206 let val = plan_expr(ecx, val)?;
2207 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2208 let source_type = &ecx.scalar_type(&val);
2209 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2210 Ok(val) => val,
2211 Err(_) => sql_bail!(
2212 "column {} is of type {} but expression is of type {}",
2213 target_names[column].quoted(),
2214 qcx.humanize_scalar_type(target_type, false),
2215 qcx.humanize_scalar_type(source_type, false),
2216 ),
2217 };
2218 if column >= types.len() {
2219 types.push(ecx.column_type(&val));
2220 } else {
2221 types[column] = types[column].union(&ecx.column_type(&val))?;
2222 }
2223 exprs.push(val);
2224 }
2225 }
2226
2227 Ok(HirRelationExpr::CallTable {
2228 func: TableFunc::Wrap {
2229 width: values[0].len(),
2230 types,
2231 },
2232 exprs,
2233 })
2234}
2235
2236fn plan_join_identity() -> (HirRelationExpr, Scope) {
2237 let typ = SqlRelationType::new(vec![]);
2238 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2239 let scope = Scope::empty();
2240 (expr, scope)
2241}
2242
2243#[derive(Debug)]
2249struct SelectPlan {
2250 expr: HirRelationExpr,
2251 scope: Scope,
2252 order_by: Vec<ColumnOrder>,
2253 project: Vec<usize>,
2254}
2255
2256generate_extracted_config!(
2257 SelectOption,
2258 (ExpectedGroupSize, u64),
2259 (AggregateInputGroupSize, u64),
2260 (DistinctOnInputGroupSize, u64),
2261 (LimitInputGroupSize, u64)
2262);
2263
2264fn plan_select_from_where(
2282 qcx: &QueryContext,
2283 mut s: Select<Aug>,
2284 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2285) -> Result<SelectPlan, PlanError> {
2286 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2293 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2294
2295 let (mut relation_expr, mut from_scope) =
2297 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2298 let (left, left_scope) = l;
2299 plan_join(
2300 qcx,
2301 left,
2302 left_scope,
2303 &Join {
2304 relation: TableFactor::NestedJoin {
2305 join: Box::new(twj.clone()),
2306 alias: None,
2307 },
2308 join_operator: JoinOperator::CrossJoin,
2309 },
2310 )
2311 })?;
2312
2313 if let Some(selection) = &s.selection {
2315 let ecx = &ExprContext {
2316 qcx,
2317 name: "WHERE clause",
2318 scope: &from_scope,
2319 relation_type: &qcx.relation_type(&relation_expr),
2320 allow_aggregates: false,
2321 allow_subqueries: true,
2322 allow_parameters: true,
2323 allow_windows: false,
2324 };
2325 let expr = plan_expr(ecx, selection)
2326 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2327 .type_as(ecx, &SqlScalarType::Bool)?;
2328 relation_expr = relation_expr.filter(vec![expr]);
2329 }
2330
2331 let (aggregates, table_funcs) = {
2334 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2335 visitor.visit_select_mut(&mut s);
2336 for o in order_by_exprs.iter_mut() {
2337 visitor.visit_order_by_expr_mut(o);
2338 }
2339 visitor.into_result()?
2340 };
2341 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2342 if !table_funcs.is_empty() {
2343 let (expr, scope) = plan_scalar_table_funcs(
2344 qcx,
2345 table_funcs,
2346 &mut table_func_names,
2347 &relation_expr,
2348 &from_scope,
2349 )?;
2350 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2351 from_scope = from_scope.product(scope)?;
2352 }
2353
2354 let projection = {
2356 let ecx = &ExprContext {
2357 qcx,
2358 name: "SELECT clause",
2359 scope: &from_scope,
2360 relation_type: &qcx.relation_type(&relation_expr),
2361 allow_aggregates: true,
2362 allow_subqueries: true,
2363 allow_parameters: true,
2364 allow_windows: true,
2365 };
2366 let mut out = vec![];
2367 for si in &s.projection {
2368 if *si == SelectItem::Wildcard && s.from.is_empty() {
2369 sql_bail!("SELECT * with no tables specified is not valid");
2370 }
2371 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2372 }
2373 out
2374 };
2375
2376 let (mut group_scope, select_all_mapping) = {
2380 let ecx = &ExprContext {
2382 qcx,
2383 name: "GROUP BY clause",
2384 scope: &from_scope,
2385 relation_type: &qcx.relation_type(&relation_expr),
2386 allow_aggregates: false,
2387 allow_subqueries: true,
2388 allow_parameters: true,
2389 allow_windows: false,
2390 };
2391 let mut group_key = vec![];
2392 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2393 let mut group_hir_exprs = vec![];
2394 let mut group_scope = Scope::empty();
2395 let mut select_all_mapping = BTreeMap::new();
2396
2397 for group_expr in &s.group_by {
2398 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2399 let new_column = group_key.len();
2400
2401 if let Some(group_expr) = group_expr {
2402 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2406 existing_scope_item.exprs.insert(group_expr.clone());
2407 continue;
2408 }
2409 }
2410
2411 let mut scope_item = if let HirScalarExpr::Column(
2412 ColumnRef {
2413 level: 0,
2414 column: old_column,
2415 },
2416 _name,
2417 ) = &expr
2418 {
2419 select_all_mapping.insert(*old_column, new_column);
2425 let scope_item = ecx.scope.items[*old_column].clone();
2426 scope_item
2427 } else {
2428 ScopeItem::empty()
2429 };
2430
2431 if let Some(group_expr) = group_expr.cloned() {
2432 scope_item.exprs.insert(group_expr);
2433 }
2434
2435 group_key.push(from_scope.len() + group_exprs.len());
2436 group_hir_exprs.push(expr.clone());
2437 group_exprs.insert(expr, scope_item);
2438 }
2439
2440 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2441 for expr in &group_hir_exprs {
2442 if let Some(scope_item) = group_exprs.remove(expr) {
2443 group_scope.items.push(scope_item);
2444 }
2445 }
2446
2447 let ecx = &ExprContext {
2449 qcx,
2450 name: "aggregate function",
2451 scope: &from_scope,
2452 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2453 allow_aggregates: false,
2454 allow_subqueries: true,
2455 allow_parameters: true,
2456 allow_windows: false,
2457 };
2458 let mut agg_exprs = vec![];
2459 for sql_function in aggregates {
2460 if sql_function.over.is_some() {
2461 unreachable!(
2462 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2463 );
2464 }
2465 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2466 group_scope
2467 .items
2468 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2469 }
2470 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2471 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2473 group_key,
2474 agg_exprs,
2475 group_size_hints.aggregate_input_group_size,
2476 );
2477
2478 for i in 0..from_scope.len() {
2484 if !select_all_mapping.contains_key(&i) {
2485 let scope_item = &ecx.scope.items[i];
2486 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2487 table_name: scope_item.table_name.clone(),
2488 column_name: scope_item.column_name.clone(),
2489 allow_unqualified_references: scope_item.allow_unqualified_references,
2490 });
2491 }
2492 }
2493
2494 (group_scope, select_all_mapping)
2495 } else {
2496 (
2498 from_scope.clone(),
2499 (0..from_scope.len()).map(|i| (i, i)).collect(),
2500 )
2501 }
2502 };
2503
2504 if let Some(ref having) = s.having {
2506 let ecx = &ExprContext {
2507 qcx,
2508 name: "HAVING clause",
2509 scope: &group_scope,
2510 relation_type: &qcx.relation_type(&relation_expr),
2511 allow_aggregates: true,
2512 allow_subqueries: true,
2513 allow_parameters: true,
2514 allow_windows: false,
2515 };
2516 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2517 relation_expr = relation_expr.filter(vec![expr]);
2518 }
2519
2520 let window_funcs = {
2533 let mut visitor = WindowFuncCollector::default();
2534 visitor.visit_select(&s);
2538 for o in order_by_exprs.iter() {
2539 visitor.visit_order_by_expr(o);
2540 }
2541 visitor.into_result()
2542 };
2543 for window_func in window_funcs {
2544 let ecx = &ExprContext {
2545 qcx,
2546 name: "window function",
2547 scope: &group_scope,
2548 relation_type: &qcx.relation_type(&relation_expr),
2549 allow_aggregates: true,
2550 allow_subqueries: true,
2551 allow_parameters: true,
2552 allow_windows: true,
2553 };
2554 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2555 group_scope.items.push(ScopeItem::from_expr(window_func));
2556 }
2557 if let Some(ref qualify) = s.qualify {
2564 let ecx = &ExprContext {
2565 qcx,
2566 name: "QUALIFY clause",
2567 scope: &group_scope,
2568 relation_type: &qcx.relation_type(&relation_expr),
2569 allow_aggregates: true,
2570 allow_subqueries: true,
2571 allow_parameters: true,
2572 allow_windows: true,
2573 };
2574 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2575 relation_expr = relation_expr.filter(vec![expr]);
2576 }
2577
2578 let output_columns = {
2580 let mut new_exprs = vec![];
2581 let mut new_type = qcx.relation_type(&relation_expr);
2582 let mut output_columns = vec![];
2583 for (select_item, column_name) in &projection {
2584 let ecx = &ExprContext {
2585 qcx,
2586 name: "SELECT clause",
2587 scope: &group_scope,
2588 relation_type: &new_type,
2589 allow_aggregates: true,
2590 allow_subqueries: true,
2591 allow_parameters: true,
2592 allow_windows: true,
2593 };
2594 let expr = match select_item {
2595 ExpandedSelectItem::InputOrdinal(i) => {
2596 if let Some(column) = select_all_mapping.get(i).copied() {
2597 HirScalarExpr::column(column)
2598 } else {
2599 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2600 }
2601 }
2602 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2603 };
2604 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2605 output_columns.push((column, column_name));
2607 } else {
2608 let typ = ecx.column_type(&expr);
2615 new_type.column_types.push(typ);
2616 new_exprs.push(expr);
2617 output_columns.push((group_scope.len(), column_name));
2618 group_scope
2619 .items
2620 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2621 }
2622 }
2623 relation_expr = relation_expr.map(new_exprs);
2624 output_columns
2625 };
2626 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2627
2628 let order_by = {
2630 let relation_type = qcx.relation_type(&relation_expr);
2631 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2632 &ExprContext {
2633 qcx,
2634 name: "ORDER BY clause",
2635 scope: &group_scope,
2636 relation_type: &relation_type,
2637 allow_aggregates: true,
2638 allow_subqueries: true,
2639 allow_parameters: true,
2640 allow_windows: true,
2641 },
2642 &order_by_exprs,
2643 &output_columns,
2644 )?;
2645
2646 match s.distinct {
2647 None => relation_expr = relation_expr.map(map_exprs),
2648 Some(Distinct::EntireRow) => {
2649 if relation_type.arity() == 0 {
2650 sql_bail!("SELECT DISTINCT must have at least one column");
2651 }
2652 if !try_push_projection_order_by(
2656 &mut relation_expr,
2657 &mut project_key,
2658 &mut order_by,
2659 ) {
2660 sql_bail!(
2661 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2662 );
2663 }
2664 assert!(map_exprs.is_empty());
2665 relation_expr = relation_expr.distinct();
2666 }
2667 Some(Distinct::On(exprs)) => {
2668 let ecx = &ExprContext {
2669 qcx,
2670 name: "DISTINCT ON clause",
2671 scope: &group_scope,
2672 relation_type: &qcx.relation_type(&relation_expr),
2673 allow_aggregates: true,
2674 allow_subqueries: true,
2675 allow_parameters: true,
2676 allow_windows: true,
2677 };
2678
2679 let mut distinct_exprs = vec![];
2680 for expr in &exprs {
2681 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2682 distinct_exprs.push(expr);
2683 }
2684
2685 let mut distinct_key = vec![];
2686
2687 let arity = relation_type.arity();
2697 for ord in order_by.iter().take(distinct_exprs.len()) {
2698 let mut expr = &HirScalarExpr::column(ord.column);
2701 if ord.column >= arity {
2702 expr = &map_exprs[ord.column - arity];
2703 };
2704 match distinct_exprs.iter().position(move |e| e == expr) {
2705 None => sql_bail!(
2706 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2707 ),
2708 Some(pos) => {
2709 distinct_exprs.remove(pos);
2710 }
2711 }
2712 distinct_key.push(ord.column);
2713 }
2714
2715 for expr in distinct_exprs {
2717 let column = match expr {
2720 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2721 _ => {
2722 map_exprs.push(expr);
2723 arity + map_exprs.len() - 1
2724 }
2725 };
2726 distinct_key.push(column);
2727 }
2728
2729 let distinct_len = distinct_key.len();
2734 relation_expr = HirRelationExpr::top_k(
2735 relation_expr.map(map_exprs),
2736 distinct_key,
2737 order_by.iter().skip(distinct_len).cloned().collect(),
2738 Some(HirScalarExpr::literal(
2739 Datum::Int64(1),
2740 SqlScalarType::Int64,
2741 )),
2742 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2743 group_size_hints.distinct_on_input_group_size,
2744 );
2745 }
2746 }
2747
2748 order_by
2749 };
2750
2751 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2756
2757 Ok(SelectPlan {
2758 expr: relation_expr,
2759 scope,
2760 order_by,
2761 project: project_key,
2762 })
2763}
2764
2765fn plan_scalar_table_funcs(
2766 qcx: &QueryContext,
2767 table_funcs: BTreeMap<Function<Aug>, String>,
2768 table_func_names: &mut BTreeMap<String, Ident>,
2769 relation_expr: &HirRelationExpr,
2770 from_scope: &Scope,
2771) -> Result<(HirRelationExpr, Scope), PlanError> {
2772 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2773
2774 for (table_func, id) in table_funcs.iter() {
2775 table_func_names.insert(
2776 id.clone(),
2777 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2779 );
2780 }
2781 if table_funcs.len() == 1 {
2784 let (table_func, id) = table_funcs.iter().next().unwrap();
2785 let (expr, mut scope) =
2786 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2787
2788 let num_cols = scope.len();
2790 for i in 0..scope.len() {
2791 scope.items[i].table_name = Some(PartialItemName {
2792 database: None,
2793 schema: None,
2794 item: id.clone(),
2795 });
2796 scope.items[i].from_single_column_function = num_cols == 1;
2797 scope.items[i].allow_unqualified_references = false;
2798 }
2799 return Ok((expr, scope));
2800 }
2801 let (expr, mut scope, num_cols) =
2803 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2804
2805 let mut i = 0;
2807 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2808 for _ in 0..num_cols {
2809 scope.items[i].table_name = Some(PartialItemName {
2810 database: None,
2811 schema: None,
2812 item: id.clone(),
2813 });
2814 scope.items[i].from_single_column_function = num_cols == 1;
2815 scope.items[i].allow_unqualified_references = false;
2816 i += 1;
2817 }
2818 scope.items[i].table_name = Some(PartialItemName {
2822 database: None,
2823 schema: None,
2824 item: id.clone(),
2825 });
2826 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2827 scope.items[i].allow_unqualified_references = false;
2828 i += 1;
2829 }
2830 scope.items[i].allow_unqualified_references = false;
2832 Ok((expr, scope))
2833}
2834
2835fn plan_group_by_expr<'a>(
2842 ecx: &ExprContext,
2843 group_expr: &'a Expr<Aug>,
2844 projection: &'a [(ExpandedSelectItem, ColumnName)],
2845) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2846 let plan_projection = |column: usize| match &projection[column].0 {
2847 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2848 ExpandedSelectItem::Expr(expr) => {
2849 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2850 }
2851 };
2852
2853 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2856 return plan_projection(column);
2857 }
2858
2859 match group_expr {
2863 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2864 Err(PlanError::UnknownColumn {
2865 table: None,
2866 column,
2867 similar,
2868 }) => {
2869 let mut iter = projection.iter().map(|(_expr, name)| name);
2872 if let Some(i) = iter.position(|n| *n == column) {
2873 if iter.any(|n| *n == column) {
2874 Err(PlanError::AmbiguousColumn(column))
2875 } else {
2876 plan_projection(i)
2877 }
2878 } else {
2879 Err(PlanError::UnknownColumn {
2882 table: None,
2883 column,
2884 similar,
2885 })
2886 }
2887 }
2888 res => Ok((Some(group_expr), res?)),
2889 },
2890 _ => Ok((
2891 Some(group_expr),
2892 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2893 )),
2894 }
2895}
2896
2897pub(crate) fn plan_order_by_exprs(
2905 ecx: &ExprContext,
2906 order_by_exprs: &[OrderByExpr<Aug>],
2907 output_columns: &[(usize, &ColumnName)],
2908) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2909 let mut order_by = vec![];
2910 let mut map_exprs = vec![];
2911 for obe in order_by_exprs {
2912 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2913 let column = match expr {
2916 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2917 _ => {
2918 map_exprs.push(expr);
2919 ecx.relation_type.arity() + map_exprs.len() - 1
2920 }
2921 };
2922 order_by.push(resolve_desc_and_nulls_last(obe, column));
2923 }
2924 Ok((order_by, map_exprs))
2925}
2926
2927fn plan_order_by_or_distinct_expr(
2945 ecx: &ExprContext,
2946 expr: &Expr<Aug>,
2947 output_columns: &[(usize, &ColumnName)],
2948) -> Result<HirScalarExpr, PlanError> {
2949 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2950 return Ok(HirScalarExpr::column(output_columns[i].0));
2951 }
2952
2953 if let Expr::Identifier(names) = expr {
2954 if let [name] = &names[..] {
2955 let name = normalize::column_name(name.clone());
2956 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2957 if let Some((i, _)) = iter.next() {
2958 match iter.next() {
2959 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2963 _ => return Ok(HirScalarExpr::column(*i)),
2964 }
2965 }
2966 }
2967 }
2968
2969 plan_expr(ecx, expr)?.type_as_any(ecx)
2970}
2971
2972fn plan_table_with_joins(
2973 qcx: &QueryContext,
2974 table_with_joins: &TableWithJoins<Aug>,
2975) -> Result<(HirRelationExpr, Scope), PlanError> {
2976 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2977 for join in &table_with_joins.joins {
2978 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2979 expr = new_expr;
2980 scope = new_scope;
2981 }
2982 Ok((expr, scope))
2983}
2984
2985fn plan_table_factor(
2986 qcx: &QueryContext,
2987 table_factor: &TableFactor<Aug>,
2988) -> Result<(HirRelationExpr, Scope), PlanError> {
2989 match table_factor {
2990 TableFactor::Table { name, alias } => {
2991 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2992 let scope = plan_table_alias(scope, alias.as_ref())?;
2993 Ok((expr, scope))
2994 }
2995
2996 TableFactor::Function {
2997 function,
2998 alias,
2999 with_ordinality,
3000 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3001
3002 TableFactor::RowsFrom {
3003 functions,
3004 alias,
3005 with_ordinality,
3006 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3007
3008 TableFactor::Derived {
3009 lateral,
3010 subquery,
3011 alias,
3012 } => {
3013 let mut qcx = (*qcx).clone();
3014 if !lateral {
3015 for scope in &mut qcx.outer_scopes {
3019 if scope.lateral_barrier {
3020 break;
3021 }
3022 scope.items.clear();
3023 }
3024 }
3025 qcx.outer_scopes[0].lateral_barrier = true;
3026 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3027 let scope = plan_table_alias(scope, alias.as_ref())?;
3028 Ok((expr, scope))
3029 }
3030
3031 TableFactor::NestedJoin { join, alias } => {
3032 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3033 let scope = plan_table_alias(scope, alias.as_ref())?;
3034 Ok((expr, scope))
3035 }
3036 }
3037}
3038
3039fn plan_rows_from(
3081 qcx: &QueryContext,
3082 functions: &[Function<Aug>],
3083 alias: Option<&TableAlias>,
3084 with_ordinality: bool,
3085) -> Result<(HirRelationExpr, Scope), PlanError> {
3086 if let [function] = functions {
3089 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3090 }
3091
3092 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3096 qcx,
3097 functions,
3098 Some(functions[0].name.full_item_name().clone()),
3099 )?;
3100
3101 let mut columns = Vec::new();
3103 let mut offset = 0;
3104 for (idx, cols) in num_cols.into_iter().enumerate() {
3106 for i in 0..cols {
3107 columns.push(offset + i);
3108 }
3109 offset += cols + 1;
3110
3111 scope.items.remove(offset - idx - 1);
3114 }
3115
3116 if with_ordinality {
3119 columns.push(offset);
3120 } else {
3121 scope.items.pop();
3122 }
3123
3124 let expr = expr.project(columns);
3125
3126 let scope = plan_table_alias(scope, alias)?;
3127 Ok((expr, scope))
3128}
3129
3130fn plan_rows_from_internal<'a>(
3153 qcx: &QueryContext,
3154 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3155 table_name: Option<FullItemName>,
3156) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3157 let mut functions = functions.into_iter();
3158 let mut num_cols = Vec::new();
3159
3160 let (mut left_expr, mut left_scope) =
3164 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3165 num_cols.push(left_scope.len() - 1);
3166 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3168 left_scope
3169 .items
3170 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3171
3172 for function in functions {
3173 let qcx = qcx.empty_derived_context();
3175 let (right_expr, mut right_scope) =
3176 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3177 num_cols.push(right_scope.len() - 1);
3178 let left_col = left_scope.len() - 1;
3179 let right_col = left_scope.len() + right_scope.len() - 1;
3180 let on = HirScalarExpr::call_binary(
3181 HirScalarExpr::column(left_col),
3182 HirScalarExpr::column(right_col),
3183 expr_func::Eq,
3184 );
3185 left_expr = left_expr
3186 .join(right_expr, on, JoinKind::FullOuter)
3187 .map(vec![HirScalarExpr::call_variadic(
3188 VariadicFunc::Coalesce,
3189 vec![
3190 HirScalarExpr::column(left_col),
3191 HirScalarExpr::column(right_col),
3192 ],
3193 )]);
3194
3195 left_expr = left_expr.project(
3198 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3201 );
3202 right_scope.items.push(left_scope.items.pop().unwrap());
3204
3205 left_scope.items.extend(right_scope.items);
3206 }
3207
3208 Ok((left_expr, left_scope, num_cols))
3209}
3210
3211fn plan_solitary_table_function(
3215 qcx: &QueryContext,
3216 function: &Function<Aug>,
3217 alias: Option<&TableAlias>,
3218 with_ordinality: bool,
3219) -> Result<(HirRelationExpr, Scope), PlanError> {
3220 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3221
3222 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3223 if single_column_function {
3224 let item = &mut scope.items[0];
3225
3226 item.from_single_column_function = true;
3229
3230 if let Some(alias) = alias {
3245 if let ScopeItem {
3246 table_name: Some(table_name),
3247 column_name,
3248 ..
3249 } = item
3250 {
3251 if table_name.item.as_str() == column_name.as_str() {
3252 *column_name = normalize::column_name(alias.name.clone());
3253 }
3254 }
3255 }
3256 }
3257
3258 let scope = plan_table_alias(scope, alias)?;
3259 Ok((expr, scope))
3260}
3261
3262fn plan_table_function_internal(
3267 qcx: &QueryContext,
3268 Function {
3269 name,
3270 args,
3271 filter,
3272 over,
3273 distinct,
3274 }: &Function<Aug>,
3275 with_ordinality: bool,
3276 table_name: Option<FullItemName>,
3277) -> Result<(HirRelationExpr, Scope), PlanError> {
3278 assert_none!(filter, "cannot parse table function with FILTER");
3279 assert_none!(over, "cannot parse table function with OVER");
3280 assert!(!*distinct, "cannot parse table function with DISTINCT");
3281
3282 let ecx = &ExprContext {
3283 qcx,
3284 name: "table function arguments",
3285 scope: &Scope::empty(),
3286 relation_type: &SqlRelationType::empty(),
3287 allow_aggregates: false,
3288 allow_subqueries: true,
3289 allow_parameters: true,
3290 allow_windows: false,
3291 };
3292
3293 let scalar_args = match args {
3294 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3295 FunctionArgs::Args { args, order_by } => {
3296 if !order_by.is_empty() {
3297 sql_bail!(
3298 "ORDER BY specified, but {} is not an aggregate function",
3299 name
3300 );
3301 }
3302 plan_exprs(ecx, args)?
3303 }
3304 };
3305
3306 let table_name = match table_name {
3307 Some(table_name) => table_name.item,
3308 None => name.full_item_name().item.clone(),
3309 };
3310
3311 let scope_name = Some(PartialItemName {
3312 database: None,
3313 schema: None,
3314 item: table_name,
3315 });
3316
3317 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3318 Func::Table(impls) => {
3319 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3320 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3321 let expr = match tf.imp {
3322 TableFuncImpl::CallTable { mut func, exprs } => {
3323 if with_ordinality {
3324 func = TableFunc::with_ordinality(func.clone()).ok_or(
3325 PlanError::Unsupported {
3326 feature: format!("WITH ORDINALITY on {}", func),
3327 discussion_no: None,
3328 },
3329 )?;
3330 }
3331 HirRelationExpr::CallTable { func, exprs }
3332 }
3333 TableFuncImpl::Expr(expr) => {
3334 if !with_ordinality {
3335 expr
3336 } else {
3337 if qcx
3341 .scx
3342 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3343 {
3344 tracing::error!(
3348 %name,
3349 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3350 );
3351 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3352 func: WindowExprType::Scalar(ScalarWindowExpr {
3353 func: ScalarWindowFunc::RowNumber,
3354 order_by: vec![],
3355 }),
3356 partition_by: vec![],
3357 order_by: vec![],
3358 })])
3359 } else {
3360 bail_unsupported!(format!(
3361 "WITH ORDINALITY or ROWS FROM with {}",
3362 name
3363 ));
3364 }
3365 }
3366 }
3367 };
3368 (expr, scope)
3369 }
3370 Func::Scalar(impls) => {
3371 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3372 let output = expr.typ(
3373 &qcx.outer_relation_types,
3374 &SqlRelationType::new(vec![]),
3375 &qcx.scx.param_types.borrow(),
3376 );
3377
3378 let relation = SqlRelationType::new(vec![output]);
3379
3380 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3381 let column_name = normalize::column_name(function_ident);
3382 let name = column_name.to_string();
3383
3384 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3385
3386 let mut func = TableFunc::TabletizedScalar { relation, name };
3387 if with_ordinality {
3388 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3389 feature: format!("WITH ORDINALITY on {}", func),
3390 discussion_no: None,
3391 })?;
3392 }
3393 (
3394 HirRelationExpr::CallTable {
3395 func,
3396 exprs: vec![expr],
3397 },
3398 scope,
3399 )
3400 }
3401 o => sql_bail!(
3402 "{} functions are not supported in functions in FROM",
3403 o.class()
3404 ),
3405 };
3406
3407 if with_ordinality {
3408 scope
3409 .items
3410 .push(ScopeItem::from_name(scope_name, "ordinality"));
3411 }
3412
3413 Ok((expr, scope))
3414}
3415
3416fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3417 if let Some(TableAlias {
3418 name,
3419 columns,
3420 strict,
3421 }) = alias
3422 {
3423 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3424 sql_bail!(
3425 "{} has {} columns available but {} columns specified",
3426 name,
3427 scope.items.len(),
3428 columns.len()
3429 );
3430 }
3431
3432 let table_name = normalize::ident(name.to_owned());
3433 for (i, item) in scope.items.iter_mut().enumerate() {
3434 item.table_name = if item.allow_unqualified_references {
3435 Some(PartialItemName {
3436 database: None,
3437 schema: None,
3438 item: table_name.clone(),
3439 })
3440 } else {
3441 None
3475 };
3476 item.column_name = columns
3477 .get(i)
3478 .map(|a| normalize::column_name(a.clone()))
3479 .unwrap_or_else(|| item.column_name.clone());
3480 }
3481 }
3482 Ok(scope)
3483}
3484
3485fn invent_column_name(
3489 ecx: &ExprContext,
3490 expr: &Expr<Aug>,
3491 table_func_names: &BTreeMap<String, Ident>,
3492) -> Option<ColumnName> {
3493 #[derive(Debug)]
3500 enum NameQuality {
3501 Low,
3502 High,
3503 }
3504
3505 fn invent(
3506 ecx: &ExprContext,
3507 expr: &Expr<Aug>,
3508 table_func_names: &BTreeMap<String, Ident>,
3509 ) -> Option<(ColumnName, NameQuality)> {
3510 match expr {
3511 Expr::Identifier(names) => {
3512 if let [name] = names.as_slice() {
3513 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3514 return Some((
3515 normalize::column_name(table_func_name.clone()),
3516 NameQuality::High,
3517 ));
3518 }
3519 }
3520 names
3521 .last()
3522 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3523 }
3524 Expr::Value(v) => match v {
3525 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3528 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3529 _ => None,
3530 },
3531 Expr::Function(func) => {
3532 let (schema, item) = match &func.name {
3533 ResolvedItemName::Item {
3534 qualifiers,
3535 full_name,
3536 ..
3537 } => (&qualifiers.schema_spec, full_name.item.clone()),
3538 _ => unreachable!(),
3539 };
3540
3541 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3542 || schema
3543 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3544 {
3545 None
3546 } else {
3547 Some((item.into(), NameQuality::High))
3548 }
3549 }
3550 Expr::HomogenizingFunction { function, .. } => Some((
3551 function.to_string().to_lowercase().into(),
3552 NameQuality::High,
3553 )),
3554 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3555 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3556 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3557 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3558 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3559 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3561 },
3562 Expr::Case { else_result, .. } => {
3563 match else_result
3564 .as_ref()
3565 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3566 {
3567 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3568 _ => Some(("case".into(), NameQuality::Low)),
3569 }
3570 }
3571 Expr::FieldAccess { field, .. } => {
3572 Some((normalize::column_name(field.clone()), NameQuality::High))
3573 }
3574 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3575 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3576 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3577 let (_expr, scope) =
3581 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3582 scope
3583 .items
3584 .first()
3585 .map(|name| (name.column_name.clone(), NameQuality::High))
3586 }
3587 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3588 _ => None,
3589 }
3590 }
3591
3592 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3593}
3594
3595#[derive(Debug)]
3596enum ExpandedSelectItem<'a> {
3597 InputOrdinal(usize),
3598 Expr(Cow<'a, Expr<Aug>>),
3599}
3600
3601impl ExpandedSelectItem<'_> {
3602 fn as_expr(&self) -> Option<&Expr<Aug>> {
3603 match self {
3604 ExpandedSelectItem::InputOrdinal(_) => None,
3605 ExpandedSelectItem::Expr(expr) => Some(expr),
3606 }
3607 }
3608}
3609
3610fn expand_select_item<'a>(
3611 ecx: &ExprContext,
3612 s: &'a SelectItem<Aug>,
3613 table_func_names: &BTreeMap<String, Ident>,
3614) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3615 match s {
3616 SelectItem::Expr {
3617 expr: Expr::QualifiedWildcard(table_name),
3618 alias: _,
3619 } => {
3620 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3621 let table_name =
3622 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3623 let out: Vec<_> = ecx
3624 .scope
3625 .items
3626 .iter()
3627 .enumerate()
3628 .filter(|(_i, item)| item.is_from_table(&table_name))
3629 .map(|(i, item)| {
3630 let name = item.column_name.clone();
3631 (ExpandedSelectItem::InputOrdinal(i), name)
3632 })
3633 .collect();
3634 if out.is_empty() {
3635 sql_bail!("no table named '{}' in scope", table_name);
3636 }
3637 Ok(out)
3638 }
3639 SelectItem::Expr {
3640 expr: Expr::WildcardAccess(sql_expr),
3641 alias: _,
3642 } => {
3643 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3644 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3650 let fields = match ecx.scalar_type(&expr) {
3651 SqlScalarType::Record { fields, .. } => fields,
3652 ty => sql_bail!(
3653 "type {} is not composite",
3654 ecx.humanize_scalar_type(&ty, false)
3655 ),
3656 };
3657 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3658 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3659 if let [name] = ident.as_slice() {
3660 if let Ok(items) = ecx.scope.items_from_table(
3661 &[],
3662 &PartialItemName {
3663 database: None,
3664 schema: None,
3665 item: name.as_str().to_string(),
3666 },
3667 ) {
3668 for (_, item) in items {
3669 if item
3670 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3671 {
3672 skip_cols.insert(item.column_name.clone());
3673 }
3674 }
3675 }
3676 }
3677 }
3678 let items = fields
3679 .iter()
3680 .filter_map(|(name, _ty)| {
3681 if skip_cols.contains(name) {
3682 None
3683 } else {
3684 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3685 expr: sql_expr.clone(),
3686 field: name.clone().into(),
3687 }));
3688 Some((item, name.clone()))
3689 }
3690 })
3691 .collect();
3692 Ok(items)
3693 }
3694 SelectItem::Wildcard => {
3695 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3696 let items: Vec<_> = ecx
3697 .scope
3698 .items
3699 .iter()
3700 .enumerate()
3701 .filter(|(_i, item)| item.allow_unqualified_references)
3702 .map(|(i, item)| {
3703 let name = item.column_name.clone();
3704 (ExpandedSelectItem::InputOrdinal(i), name)
3705 })
3706 .collect();
3707
3708 Ok(items)
3709 }
3710 SelectItem::Expr { expr, alias } => {
3711 let name = alias
3712 .clone()
3713 .map(normalize::column_name)
3714 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3715 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3716 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3717 }
3718 }
3719}
3720
3721fn plan_join(
3722 left_qcx: &QueryContext,
3723 left: HirRelationExpr,
3724 left_scope: Scope,
3725 join: &Join<Aug>,
3726) -> Result<(HirRelationExpr, Scope), PlanError> {
3727 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3728 let (kind, constraint) = match &join.join_operator {
3729 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3730 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3731 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3732 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3733 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3734 };
3735
3736 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3737 if !kind.can_be_correlated() {
3738 for item in &mut right_qcx.outer_scopes[0].items {
3739 item.error_if_referenced =
3744 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3745 table: table.cloned(),
3746 column: column.clone(),
3747 });
3748 }
3749 }
3750 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3751
3752 let (expr, scope) = match constraint {
3753 JoinConstraint::On(expr) => {
3754 let product_scope = left_scope.product(right_scope)?;
3755 let ecx = &ExprContext {
3756 qcx: left_qcx,
3757 name: "ON clause",
3758 scope: &product_scope,
3759 relation_type: &SqlRelationType::new(
3760 left_qcx
3761 .relation_type(&left)
3762 .column_types
3763 .into_iter()
3764 .chain(right_qcx.relation_type(&right).column_types)
3765 .collect(),
3766 ),
3767 allow_aggregates: false,
3768 allow_subqueries: true,
3769 allow_parameters: true,
3770 allow_windows: false,
3771 };
3772 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3773 let joined = left.join(right, on, kind);
3774 (joined, product_scope)
3775 }
3776 JoinConstraint::Using { columns, alias } => {
3777 let column_names = columns
3778 .iter()
3779 .map(|ident| normalize::column_name(ident.clone()))
3780 .collect::<Vec<_>>();
3781
3782 plan_using_constraint(
3783 &column_names,
3784 left_qcx,
3785 left,
3786 left_scope,
3787 &right_qcx,
3788 right,
3789 right_scope,
3790 kind,
3791 alias.as_ref(),
3792 )?
3793 }
3794 JoinConstraint::Natural => {
3795 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3798 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3799 let left_column_names = left_scope.column_names();
3800 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3801 let column_names: Vec<_> = left_column_names
3802 .filter(|col| right_column_names.contains(col))
3803 .cloned()
3804 .collect();
3805 plan_using_constraint(
3806 &column_names,
3807 left_qcx,
3808 left,
3809 left_scope,
3810 &right_qcx,
3811 right,
3812 right_scope,
3813 kind,
3814 None,
3815 )?
3816 }
3817 };
3818 Ok((expr, scope))
3819}
3820
3821#[allow(clippy::too_many_arguments)]
3823fn plan_using_constraint(
3824 column_names: &[ColumnName],
3825 left_qcx: &QueryContext,
3826 left: HirRelationExpr,
3827 left_scope: Scope,
3828 right_qcx: &QueryContext,
3829 right: HirRelationExpr,
3830 right_scope: Scope,
3831 kind: JoinKind,
3832 alias: Option<&Ident>,
3833) -> Result<(HirRelationExpr, Scope), PlanError> {
3834 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3835
3836 let mut unique_column_names = BTreeSet::new();
3839 for c in column_names {
3840 if !unique_column_names.insert(c) {
3841 return Err(PlanError::Unsupported {
3842 feature: format!(
3843 "column name {} appears more than once in USING clause",
3844 c.quoted()
3845 ),
3846 discussion_no: None,
3847 });
3848 }
3849 }
3850
3851 let alias_item_name = alias.map(|alias| PartialItemName {
3852 database: None,
3853 schema: None,
3854 item: alias.clone().to_string(),
3855 });
3856
3857 if let Some(alias_item_name) = &alias_item_name {
3858 for partial_item_name in both_scope.table_names() {
3859 if partial_item_name.matches(alias_item_name) {
3860 sql_bail!(
3861 "table name \"{}\" specified more than once",
3862 alias_item_name
3863 )
3864 }
3865 }
3866 }
3867
3868 let ecx = &ExprContext {
3869 qcx: right_qcx,
3870 name: "USING clause",
3871 scope: &both_scope,
3872 relation_type: &SqlRelationType::new(
3873 left_qcx
3874 .relation_type(&left)
3875 .column_types
3876 .into_iter()
3877 .chain(right_qcx.relation_type(&right).column_types)
3878 .collect(),
3879 ),
3880 allow_aggregates: false,
3881 allow_subqueries: false,
3882 allow_parameters: false,
3883 allow_windows: false,
3884 };
3885
3886 let mut join_exprs = vec![];
3887 let mut map_exprs = vec![];
3888 let mut new_items = vec![];
3889 let mut join_cols = vec![];
3890 let mut hidden_cols = vec![];
3891
3892 for column_name in column_names {
3893 let (lhs, lhs_name) = left_scope.resolve_using_column(
3895 column_name,
3896 JoinSide::Left,
3897 &mut left_qcx.name_manager.borrow_mut(),
3898 )?;
3899 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3900 column_name,
3901 JoinSide::Right,
3902 &mut right_qcx.name_manager.borrow_mut(),
3903 )?;
3904
3905 rhs.column += left_scope.len();
3907
3908 let mut exprs = coerce_homogeneous_exprs(
3910 &ecx.with_name(&format!(
3911 "NATURAL/USING join column {}",
3912 column_name.quoted()
3913 )),
3914 vec![
3915 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3916 lhs,
3917 Arc::clone(&lhs_name),
3918 )),
3919 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3920 rhs,
3921 Arc::clone(&rhs_name),
3922 )),
3923 ],
3924 None,
3925 )?;
3926 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3927
3928 match kind {
3929 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3930 join_cols.push(lhs.column);
3931 hidden_cols.push(rhs.column);
3932 }
3933 JoinKind::RightOuter => {
3934 join_cols.push(rhs.column);
3935 hidden_cols.push(lhs.column);
3936 }
3937 JoinKind::FullOuter => {
3938 join_cols.push(both_scope.items.len() + map_exprs.len());
3941 hidden_cols.push(lhs.column);
3942 hidden_cols.push(rhs.column);
3943 map_exprs.push(HirScalarExpr::call_variadic(
3944 VariadicFunc::Coalesce,
3945 vec![expr1.clone(), expr2.clone()],
3946 ));
3947 new_items.push(ScopeItem::from_column_name(column_name));
3948 }
3949 }
3950
3951 if alias_item_name.is_some() {
3956 let new_item_col = both_scope.items.len() + new_items.len();
3957 join_cols.push(new_item_col);
3958 hidden_cols.push(new_item_col);
3959
3960 new_items.push(ScopeItem::from_name(
3961 alias_item_name.clone(),
3962 column_name.clone().to_string(),
3963 ));
3964
3965 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3969 }
3970
3971 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3972 }
3973 both_scope.items.extend(new_items);
3974
3975 for c in hidden_cols {
3979 both_scope.items[c].allow_unqualified_references = false;
3980 }
3981
3982 let project_key = join_cols
3984 .into_iter()
3985 .chain(0..both_scope.items.len())
3986 .unique()
3987 .collect::<Vec<_>>();
3988
3989 both_scope = both_scope.project(&project_key);
3990
3991 let on = HirScalarExpr::variadic_and(join_exprs);
3992
3993 let both = left
3994 .join(right, on, kind)
3995 .map(map_exprs)
3996 .project(project_key);
3997 Ok((both, both_scope))
3998}
3999
4000pub fn plan_expr<'a>(
4001 ecx: &'a ExprContext,
4002 e: &Expr<Aug>,
4003) -> Result<CoercibleScalarExpr, PlanError> {
4004 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4005}
4006
4007fn plan_expr_inner<'a>(
4008 ecx: &'a ExprContext,
4009 e: &Expr<Aug>,
4010) -> Result<CoercibleScalarExpr, PlanError> {
4011 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4012 return Ok(HirScalarExpr::named_column(
4014 i,
4015 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4016 )
4017 .into());
4018 }
4019
4020 match e {
4021 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4023 Ok(plan_identifier(ecx, names)?.into())
4024 }
4025
4026 Expr::Value(val) => plan_literal(val),
4028 Expr::Parameter(n) => plan_parameter(ecx, *n),
4029 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4030 Expr::List(exprs) => plan_list(ecx, exprs, None),
4031 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4032 Expr::Row { exprs } => plan_row(ecx, exprs),
4033
4034 Expr::Op { op, expr1, expr2 } => {
4036 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4037 }
4038 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4039 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4040
4041 Expr::Not { expr } => plan_not(ecx, expr),
4043 Expr::And { left, right } => plan_and(ecx, left, right),
4044 Expr::Or { left, right } => plan_or(ecx, left, right),
4045 Expr::IsExpr {
4046 expr,
4047 construct,
4048 negated,
4049 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4050 Expr::Case {
4051 operand,
4052 conditions,
4053 results,
4054 else_result,
4055 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4056 Expr::HomogenizingFunction { function, exprs } => {
4057 plan_homogenizing_function(ecx, function, exprs)
4058 }
4059 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4060 ecx,
4061 &None,
4062 &[l_expr.clone().equals(*r_expr.clone())],
4063 &[Expr::null()],
4064 &Some(Box::new(*l_expr.clone())),
4065 )?
4066 .into()),
4067 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4068 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4069 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4070 Expr::Like {
4071 expr,
4072 pattern,
4073 escape,
4074 case_insensitive,
4075 negated,
4076 } => Ok(plan_like(
4077 ecx,
4078 expr,
4079 pattern,
4080 escape.as_deref(),
4081 *case_insensitive,
4082 *negated,
4083 )?
4084 .into()),
4085
4086 Expr::InList {
4087 expr,
4088 list,
4089 negated,
4090 } => plan_in_list(ecx, expr, list, negated),
4091
4092 Expr::Exists(query) => plan_exists(ecx, query),
4094 Expr::Subquery(query) => plan_subquery(ecx, query),
4095 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4096 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4097 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4098 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4099 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4100 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4101 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4102 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4103 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4104 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4105 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4106 }
4107}
4108
4109fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4110 if !ecx.allow_parameters {
4111 return Err(PlanError::UnknownParameter(n));
4115 }
4116 if n == 0 || n > 65536 {
4117 return Err(PlanError::UnknownParameter(n));
4118 }
4119 if ecx.param_types().borrow().contains_key(&n) {
4120 Ok(HirScalarExpr::parameter(n).into())
4121 } else {
4122 Ok(CoercibleScalarExpr::Parameter(n))
4123 }
4124}
4125
4126fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4127 let mut out = vec![];
4128 for e in exprs {
4129 out.push(plan_expr(ecx, e)?);
4130 }
4131 Ok(CoercibleScalarExpr::LiteralRecord(out))
4132}
4133
4134fn plan_cast(
4135 ecx: &ExprContext,
4136 expr: &Expr<Aug>,
4137 data_type: &ResolvedDataType,
4138) -> Result<CoercibleScalarExpr, PlanError> {
4139 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4140 let expr = match expr {
4141 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4150 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4151 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4152 _ => plan_expr(ecx, expr)?,
4153 };
4154 let ecx = &ecx.with_name("CAST");
4155 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4156 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4157 Ok(expr.into())
4158}
4159
4160fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4161 let ecx = ecx.with_name("NOT argument");
4162 Ok(plan_expr(&ecx, expr)?
4163 .type_as(&ecx, &SqlScalarType::Bool)?
4164 .call_unary(UnaryFunc::Not(expr_func::Not))
4165 .into())
4166}
4167
4168fn plan_and(
4169 ecx: &ExprContext,
4170 left: &Expr<Aug>,
4171 right: &Expr<Aug>,
4172) -> Result<CoercibleScalarExpr, PlanError> {
4173 let ecx = ecx.with_name("AND argument");
4174 Ok(HirScalarExpr::variadic_and(vec![
4175 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4176 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4177 ])
4178 .into())
4179}
4180
4181fn plan_or(
4182 ecx: &ExprContext,
4183 left: &Expr<Aug>,
4184 right: &Expr<Aug>,
4185) -> Result<CoercibleScalarExpr, PlanError> {
4186 let ecx = ecx.with_name("OR argument");
4187 Ok(HirScalarExpr::variadic_or(vec![
4188 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4189 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4190 ])
4191 .into())
4192}
4193
4194fn plan_in_list(
4195 ecx: &ExprContext,
4196 lhs: &Expr<Aug>,
4197 list: &Vec<Expr<Aug>>,
4198 negated: &bool,
4199) -> Result<CoercibleScalarExpr, PlanError> {
4200 let ecx = ecx.with_name("IN list");
4201 let or = HirScalarExpr::variadic_or(
4202 list.into_iter()
4203 .map(|e| {
4204 let eq = lhs.clone().equals(e.clone());
4205 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4206 })
4207 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4208 );
4209 Ok(if *negated {
4210 or.call_unary(UnaryFunc::Not(expr_func::Not))
4211 } else {
4212 or
4213 }
4214 .into())
4215}
4216
4217fn plan_homogenizing_function(
4218 ecx: &ExprContext,
4219 function: &HomogenizingFunction,
4220 exprs: &[Expr<Aug>],
4221) -> Result<CoercibleScalarExpr, PlanError> {
4222 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4224 match function {
4225 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4226 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4227 HomogenizingFunction::Least => VariadicFunc::Least,
4228 },
4229 coerce_homogeneous_exprs(
4230 &ecx.with_name(&function.to_string().to_lowercase()),
4231 plan_exprs(ecx, exprs)?,
4232 None,
4233 )?,
4234 );
4235 Ok(expr.into())
4236}
4237
4238fn plan_field_access(
4239 ecx: &ExprContext,
4240 expr: &Expr<Aug>,
4241 field: &Ident,
4242) -> Result<CoercibleScalarExpr, PlanError> {
4243 let field = normalize::column_name(field.clone());
4244 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4245 let ty = ecx.scalar_type(&expr);
4246 let i = match &ty {
4247 SqlScalarType::Record { fields, .. } => {
4248 fields.iter().position(|(name, _ty)| *name == field)
4249 }
4250 ty => sql_bail!(
4251 "column notation applied to type {}, which is not a composite type",
4252 ecx.humanize_scalar_type(ty, false)
4253 ),
4254 };
4255 match i {
4256 None => sql_bail!(
4257 "field {} not found in data type {}",
4258 field,
4259 ecx.humanize_scalar_type(&ty, false)
4260 ),
4261 Some(i) => Ok(expr
4262 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4263 .into()),
4264 }
4265}
4266
4267fn plan_subscript(
4268 ecx: &ExprContext,
4269 expr: &Expr<Aug>,
4270 positions: &[SubscriptPosition<Aug>],
4271) -> Result<CoercibleScalarExpr, PlanError> {
4272 assert!(
4273 !positions.is_empty(),
4274 "subscript expression must contain at least one position"
4275 );
4276
4277 let ecx = &ecx.with_name("subscripting");
4278 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4279 let ty = ecx.scalar_type(&expr);
4280 match &ty {
4281 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4282 ecx,
4283 expr,
4284 positions,
4285 if ty == SqlScalarType::Int2Vector {
4289 1
4290 } else {
4291 0
4292 },
4293 ),
4294 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4295 SqlScalarType::List { element_type, .. } => {
4296 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4298 let n_layers = ty.unwrap_list_n_layers();
4299 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4300 }
4301 ty => sql_bail!(
4302 "cannot subscript type {}",
4303 ecx.humanize_scalar_type(ty, false)
4304 ),
4305 }
4306}
4307
4308fn extract_scalar_subscript_from_positions<'a>(
4312 positions: &'a [SubscriptPosition<Aug>],
4313 expr_type_name: &str,
4314) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4315 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4316 for p in positions {
4317 if p.explicit_slice {
4318 sql_bail!("{} subscript does not support slices", expr_type_name);
4319 }
4320 assert!(
4321 p.end.is_none(),
4322 "index-appearing subscripts cannot have end value"
4323 );
4324 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4325 }
4326 Ok(scalar_subscripts)
4327}
4328
4329fn plan_subscript_array(
4330 ecx: &ExprContext,
4331 expr: HirScalarExpr,
4332 positions: &[SubscriptPosition<Aug>],
4333 offset: i64,
4334) -> Result<CoercibleScalarExpr, PlanError> {
4335 let mut exprs = Vec::with_capacity(positions.len() + 1);
4336 exprs.push(expr);
4337
4338 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4341
4342 for i in indexes {
4343 exprs.push(plan_expr(ecx, i)?.cast_to(
4344 ecx,
4345 CastContext::Explicit,
4346 &SqlScalarType::Int64,
4347 )?);
4348 }
4349
4350 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4351}
4352
4353fn plan_subscript_list(
4354 ecx: &ExprContext,
4355 mut expr: HirScalarExpr,
4356 positions: &[SubscriptPosition<Aug>],
4357 mut remaining_layers: usize,
4358 elem_type_name: &str,
4359) -> Result<CoercibleScalarExpr, PlanError> {
4360 let mut i = 0;
4361
4362 while i < positions.len() {
4363 let j = positions[i..]
4365 .iter()
4366 .position(|p| p.explicit_slice)
4367 .unwrap_or(positions.len() - i);
4368 if j != 0 {
4369 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4370 let (n, e) = plan_index_list(
4371 ecx,
4372 expr,
4373 indexes.as_slice(),
4374 remaining_layers,
4375 elem_type_name,
4376 )?;
4377 remaining_layers = n;
4378 expr = e;
4379 i += j;
4380 }
4381
4382 let j = positions[i..]
4384 .iter()
4385 .position(|p| !p.explicit_slice)
4386 .unwrap_or(positions.len() - i);
4387 if j != 0 {
4388 expr = plan_slice_list(
4389 ecx,
4390 expr,
4391 &positions[i..i + j],
4392 remaining_layers,
4393 elem_type_name,
4394 )?;
4395 i += j;
4396 }
4397 }
4398
4399 Ok(expr.into())
4400}
4401
4402fn plan_index_list(
4403 ecx: &ExprContext,
4404 expr: HirScalarExpr,
4405 indexes: &[&Expr<Aug>],
4406 n_layers: usize,
4407 elem_type_name: &str,
4408) -> Result<(usize, HirScalarExpr), PlanError> {
4409 let depth = indexes.len();
4410
4411 if depth > n_layers {
4412 if n_layers == 0 {
4413 sql_bail!("cannot subscript type {}", elem_type_name)
4414 } else {
4415 sql_bail!(
4416 "cannot index into {} layers; list only has {} layer{}",
4417 depth,
4418 n_layers,
4419 if n_layers == 1 { "" } else { "s" }
4420 )
4421 }
4422 }
4423
4424 let mut exprs = Vec::with_capacity(depth + 1);
4425 exprs.push(expr);
4426
4427 for i in indexes {
4428 exprs.push(plan_expr(ecx, i)?.cast_to(
4429 ecx,
4430 CastContext::Explicit,
4431 &SqlScalarType::Int64,
4432 )?);
4433 }
4434
4435 Ok((
4436 n_layers - depth,
4437 HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4438 ))
4439}
4440
4441fn plan_slice_list(
4442 ecx: &ExprContext,
4443 expr: HirScalarExpr,
4444 slices: &[SubscriptPosition<Aug>],
4445 n_layers: usize,
4446 elem_type_name: &str,
4447) -> Result<HirScalarExpr, PlanError> {
4448 if n_layers == 0 {
4449 sql_bail!("cannot subscript type {}", elem_type_name)
4450 }
4451
4452 let mut exprs = Vec::with_capacity(slices.len() + 1);
4454 exprs.push(expr);
4455 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4457 Ok(match position {
4458 Some(p) => {
4459 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4460 }
4461 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4462 })
4463 };
4464 for p in slices {
4465 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4466 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4467 exprs.push(start);
4468 exprs.push(end);
4469 }
4470
4471 Ok(HirScalarExpr::call_variadic(
4472 VariadicFunc::ListSliceLinear,
4473 exprs,
4474 ))
4475}
4476
4477fn plan_like(
4478 ecx: &ExprContext,
4479 expr: &Expr<Aug>,
4480 pattern: &Expr<Aug>,
4481 escape: Option<&Expr<Aug>>,
4482 case_insensitive: bool,
4483 not: bool,
4484) -> Result<HirScalarExpr, PlanError> {
4485 use CastContext::Implicit;
4486 let ecx = ecx.with_name("LIKE argument");
4487 let expr = plan_expr(&ecx, expr)?;
4488 let haystack = match ecx.scalar_type(&expr) {
4489 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4490 .type_as(&ecx, ty)?
4491 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4492 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4493 };
4494 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4495 if let Some(escape) = escape {
4496 pattern = pattern.call_binary(
4497 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4498 expr_func::LikeEscape,
4499 );
4500 }
4501 let func: BinaryFunc = if case_insensitive {
4502 expr_func::IsLikeMatchCaseInsensitive.into()
4503 } else {
4504 expr_func::IsLikeMatchCaseSensitive.into()
4505 };
4506 let like = haystack.call_binary(pattern, func);
4507 if not {
4508 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4509 } else {
4510 Ok(like)
4511 }
4512}
4513
4514fn plan_subscript_jsonb(
4515 ecx: &ExprContext,
4516 expr: HirScalarExpr,
4517 positions: &[SubscriptPosition<Aug>],
4518) -> Result<CoercibleScalarExpr, PlanError> {
4519 use CastContext::Implicit;
4520 use SqlScalarType::{Int64, String};
4521
4522 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4525
4526 let mut exprs = Vec::with_capacity(subscripts.len());
4527 for s in subscripts {
4528 let subscript = plan_expr(ecx, s)?;
4529 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4530 subscript
4531 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4532 typeconv::to_string(ecx, subscript)
4536 } else {
4537 sql_bail!("jsonb subscript type must be coercible to integer or text");
4538 };
4539 exprs.push(subscript);
4540 }
4541
4542 let expr = expr.call_binary(
4545 HirScalarExpr::call_variadic(
4546 VariadicFunc::ArrayCreate {
4547 elem_type: SqlScalarType::String,
4548 },
4549 exprs,
4550 ),
4551 expr_func::JsonbGetPath,
4552 );
4553 Ok(expr.into())
4554}
4555
4556fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4557 if !ecx.allow_subqueries {
4558 sql_bail!("{} does not allow subqueries", ecx.name)
4559 }
4560 let mut qcx = ecx.derived_query_context();
4561 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4562 Ok(expr.exists().into())
4563}
4564
4565fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4566 if !ecx.allow_subqueries {
4567 sql_bail!("{} does not allow subqueries", ecx.name)
4568 }
4569 let mut qcx = ecx.derived_query_context();
4570 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4571 let column_types = qcx.relation_type(&expr).column_types;
4572 if column_types.len() != 1 {
4573 sql_bail!(
4574 "Expected subselect to return 1 column, got {} columns",
4575 column_types.len()
4576 );
4577 }
4578 Ok(expr.select().into())
4579}
4580
4581fn plan_list_subquery(
4582 ecx: &ExprContext,
4583 query: &Query<Aug>,
4584) -> Result<CoercibleScalarExpr, PlanError> {
4585 plan_vector_like_subquery(
4586 ecx,
4587 query,
4588 |_| false,
4589 |elem_type| VariadicFunc::ListCreate { elem_type },
4590 |order_by| AggregateFunc::ListConcat { order_by },
4591 expr_func::ListListConcat.into(),
4592 |elem_type| {
4593 HirScalarExpr::literal(
4594 Datum::empty_list(),
4595 SqlScalarType::List {
4596 element_type: Box::new(elem_type),
4597 custom_id: None,
4598 },
4599 )
4600 },
4601 "list",
4602 )
4603}
4604
4605fn plan_array_subquery(
4606 ecx: &ExprContext,
4607 query: &Query<Aug>,
4608) -> Result<CoercibleScalarExpr, PlanError> {
4609 plan_vector_like_subquery(
4610 ecx,
4611 query,
4612 |elem_type| {
4613 matches!(
4614 elem_type,
4615 SqlScalarType::Char { .. }
4616 | SqlScalarType::Array { .. }
4617 | SqlScalarType::List { .. }
4618 | SqlScalarType::Map { .. }
4619 )
4620 },
4621 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4622 |order_by| AggregateFunc::ArrayConcat { order_by },
4623 expr_func::ArrayArrayConcat.into(),
4624 |elem_type| {
4625 HirScalarExpr::literal(
4626 Datum::empty_array(),
4627 SqlScalarType::Array(Box::new(elem_type)),
4628 )
4629 },
4630 "[]",
4631 )
4632}
4633
4634fn plan_vector_like_subquery<F1, F2, F3, F4>(
4636 ecx: &ExprContext,
4637 query: &Query<Aug>,
4638 is_unsupported_type: F1,
4639 vector_create: F2,
4640 aggregate_concat: F3,
4641 binary_concat: BinaryFunc,
4642 empty_literal: F4,
4643 vector_type_string: &str,
4644) -> Result<CoercibleScalarExpr, PlanError>
4645where
4646 F1: Fn(&SqlScalarType) -> bool,
4647 F2: Fn(SqlScalarType) -> VariadicFunc,
4648 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4649 F4: Fn(SqlScalarType) -> HirScalarExpr,
4650{
4651 if !ecx.allow_subqueries {
4652 sql_bail!("{} does not allow subqueries", ecx.name)
4653 }
4654
4655 let mut qcx = ecx.derived_query_context();
4656 let mut planned_query = plan_query(&mut qcx, query)?;
4657 if planned_query.limit.is_some()
4658 || !planned_query
4659 .offset
4660 .clone()
4661 .try_into_literal_int64()
4662 .is_ok_and(|offset| offset == 0)
4663 {
4664 planned_query.expr = HirRelationExpr::top_k(
4665 planned_query.expr,
4666 vec![],
4667 planned_query.order_by.clone(),
4668 planned_query.limit,
4669 planned_query.offset,
4670 planned_query.group_size_hints.limit_input_group_size,
4671 );
4672 }
4673
4674 if planned_query.project.len() != 1 {
4675 sql_bail!(
4676 "Expected subselect to return 1 column, got {} columns",
4677 planned_query.project.len()
4678 );
4679 }
4680
4681 let project_column = *planned_query.project.get(0).unwrap();
4682 let elem_type = qcx
4683 .relation_type(&planned_query.expr)
4684 .column_types
4685 .get(project_column)
4686 .cloned()
4687 .unwrap()
4688 .scalar_type();
4689
4690 if is_unsupported_type(&elem_type) {
4691 bail_unsupported!(format!(
4692 "cannot build array from subquery because return type {}{}",
4693 ecx.humanize_scalar_type(&elem_type, false),
4694 vector_type_string
4695 ));
4696 }
4697
4698 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4701 vector_create(elem_type.clone()),
4702 vec![HirScalarExpr::column(project_column)],
4703 ))
4704 .chain(
4705 planned_query
4706 .order_by
4707 .iter()
4708 .map(|co| HirScalarExpr::column(co.column)),
4709 )
4710 .collect();
4711
4712 let aggregation_projection = vec![0];
4716 let aggregation_order_by = planned_query
4717 .order_by
4718 .into_iter()
4719 .enumerate()
4720 .map(|(i, order)| ColumnOrder { column: i, ..order })
4721 .collect();
4722
4723 let reduced_expr = planned_query
4724 .expr
4725 .reduce(
4726 vec![],
4727 vec![AggregateExpr {
4728 func: aggregate_concat(aggregation_order_by),
4729 expr: Box::new(HirScalarExpr::call_variadic(
4730 VariadicFunc::RecordCreate {
4731 field_names: iter::repeat(ColumnName::from(""))
4732 .take(aggregation_exprs.len())
4733 .collect(),
4734 },
4735 aggregation_exprs,
4736 )),
4737 distinct: false,
4738 }],
4739 None,
4740 )
4741 .project(aggregation_projection);
4742
4743 Ok(reduced_expr
4745 .select()
4746 .call_binary(empty_literal(elem_type), binary_concat)
4747 .into())
4748}
4749
4750fn plan_map_subquery(
4751 ecx: &ExprContext,
4752 query: &Query<Aug>,
4753) -> Result<CoercibleScalarExpr, PlanError> {
4754 if !ecx.allow_subqueries {
4755 sql_bail!("{} does not allow subqueries", ecx.name)
4756 }
4757
4758 let mut qcx = ecx.derived_query_context();
4759 let mut query = plan_query(&mut qcx, query)?;
4760 if query.limit.is_some()
4761 || !query
4762 .offset
4763 .clone()
4764 .try_into_literal_int64()
4765 .is_ok_and(|offset| offset == 0)
4766 {
4767 query.expr = HirRelationExpr::top_k(
4768 query.expr,
4769 vec![],
4770 query.order_by.clone(),
4771 query.limit,
4772 query.offset,
4773 query.group_size_hints.limit_input_group_size,
4774 );
4775 }
4776 if query.project.len() != 2 {
4777 sql_bail!(
4778 "expected map subquery to return 2 columns, got {} columns",
4779 query.project.len()
4780 );
4781 }
4782
4783 let query_types = qcx.relation_type(&query.expr).column_types;
4784 let key_column = query.project[0];
4785 let key_type = query_types[key_column].clone().scalar_type();
4786 let value_column = query.project[1];
4787 let value_type = query_types[value_column].clone().scalar_type();
4788
4789 if key_type != SqlScalarType::String {
4790 sql_bail!("cannot build map from subquery because first column is not of type text");
4791 }
4792
4793 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4794 VariadicFunc::RecordCreate {
4795 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4796 },
4797 vec![
4798 HirScalarExpr::column(key_column),
4799 HirScalarExpr::column(value_column),
4800 ],
4801 ))
4802 .chain(
4803 query
4804 .order_by
4805 .iter()
4806 .map(|co| HirScalarExpr::column(co.column)),
4807 )
4808 .collect();
4809
4810 let expr = query
4811 .expr
4812 .reduce(
4813 vec![],
4814 vec![AggregateExpr {
4815 func: AggregateFunc::MapAgg {
4816 order_by: query
4817 .order_by
4818 .into_iter()
4819 .enumerate()
4820 .map(|(i, order)| ColumnOrder { column: i, ..order })
4821 .collect(),
4822 value_type: value_type.clone(),
4823 },
4824 expr: Box::new(HirScalarExpr::call_variadic(
4825 VariadicFunc::RecordCreate {
4826 field_names: iter::repeat(ColumnName::from(""))
4827 .take(aggregation_exprs.len())
4828 .collect(),
4829 },
4830 aggregation_exprs,
4831 )),
4832 distinct: false,
4833 }],
4834 None,
4835 )
4836 .project(vec![0]);
4837
4838 let expr = HirScalarExpr::call_variadic(
4840 VariadicFunc::Coalesce,
4841 vec![
4842 expr.select(),
4843 HirScalarExpr::literal(
4844 Datum::empty_map(),
4845 SqlScalarType::Map {
4846 value_type: Box::new(value_type),
4847 custom_id: None,
4848 },
4849 ),
4850 ],
4851 );
4852
4853 Ok(expr.into())
4854}
4855
4856fn plan_collate(
4857 ecx: &ExprContext,
4858 expr: &Expr<Aug>,
4859 collation: &UnresolvedItemName,
4860) -> Result<CoercibleScalarExpr, PlanError> {
4861 if collation.0.len() == 2
4862 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4863 && collation.0[1] == ident!("default")
4864 {
4865 plan_expr(ecx, expr)
4866 } else {
4867 bail_unsupported!("COLLATE");
4868 }
4869}
4870
4871fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4878where
4879 E: std::borrow::Borrow<Expr<Aug>>,
4880{
4881 let mut out = vec![];
4882 for expr in exprs {
4883 out.push(plan_expr(ecx, expr.borrow())?);
4884 }
4885 Ok(out)
4886}
4887
4888fn plan_array(
4890 ecx: &ExprContext,
4891 exprs: &[Expr<Aug>],
4892 type_hint: Option<&SqlScalarType>,
4893) -> Result<CoercibleScalarExpr, PlanError> {
4894 let mut out = vec![];
4896 for expr in exprs {
4897 out.push(match expr {
4898 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4901 _ => plan_expr(ecx, expr)?,
4902 });
4903 }
4904
4905 let type_hint = match type_hint {
4907 Some(SqlScalarType::Array(elem_type)) => {
4912 let multidimensional = out.iter().any(|e| {
4913 matches!(
4914 ecx.scalar_type(e),
4915 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4916 )
4917 });
4918 if multidimensional {
4919 type_hint
4920 } else {
4921 Some(&**elem_type)
4922 }
4923 }
4924 Some(_) => None,
4928 None => None,
4930 };
4931
4932 let (elem_type, exprs) = if exprs.is_empty() {
4934 if let Some(elem_type) = type_hint {
4935 (elem_type.clone(), vec![])
4936 } else {
4937 sql_bail!("cannot determine type of empty array");
4938 }
4939 } else {
4940 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4941 (ecx.scalar_type(&out[0]), out)
4942 };
4943
4944 if matches!(
4950 elem_type,
4951 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4952 ) {
4953 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4954 }
4955
4956 Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4957}
4958
4959fn plan_list(
4960 ecx: &ExprContext,
4961 exprs: &[Expr<Aug>],
4962 type_hint: Option<&SqlScalarType>,
4963) -> Result<CoercibleScalarExpr, PlanError> {
4964 let (elem_type, exprs) = if exprs.is_empty() {
4965 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4966 (element_type.without_modifiers(), vec![])
4967 } else {
4968 sql_bail!("cannot determine type of empty list");
4969 }
4970 } else {
4971 let type_hint = match type_hint {
4972 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4973 _ => None,
4974 };
4975
4976 let mut out = vec![];
4977 for expr in exprs {
4978 out.push(match expr {
4979 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4982 _ => plan_expr(ecx, expr)?,
4983 });
4984 }
4985 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4986 (ecx.scalar_type(&out[0]).without_modifiers(), out)
4987 };
4988
4989 if matches!(elem_type, SqlScalarType::Char { .. }) {
4990 bail_unsupported!("char list");
4991 }
4992
4993 Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4994}
4995
4996fn plan_map(
4997 ecx: &ExprContext,
4998 entries: &[MapEntry<Aug>],
4999 type_hint: Option<&SqlScalarType>,
5000) -> Result<CoercibleScalarExpr, PlanError> {
5001 let (value_type, exprs) = if entries.is_empty() {
5002 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5003 (value_type.without_modifiers(), vec![])
5004 } else {
5005 sql_bail!("cannot determine type of empty map");
5006 }
5007 } else {
5008 let type_hint = match type_hint {
5009 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5010 _ => None,
5011 };
5012
5013 let mut keys = vec![];
5014 let mut values = vec![];
5015 for MapEntry { key, value } in entries {
5016 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5017 let value = match value {
5018 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5021 _ => plan_expr(ecx, value)?,
5022 };
5023 keys.push(key);
5024 values.push(value);
5025 }
5026 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5027 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5028 let out = itertools::interleave(keys, values).collect();
5029 (value_type, out)
5030 };
5031
5032 if matches!(value_type, SqlScalarType::Char { .. }) {
5033 bail_unsupported!("char map");
5034 }
5035
5036 let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5037 Ok(expr.into())
5038}
5039
5040pub fn coerce_homogeneous_exprs(
5057 ecx: &ExprContext,
5058 exprs: Vec<CoercibleScalarExpr>,
5059 force_type: Option<&SqlScalarType>,
5060) -> Result<Vec<HirScalarExpr>, PlanError> {
5061 assert!(!exprs.is_empty());
5062
5063 let target_holder;
5064 let target = match force_type {
5065 Some(t) => t,
5066 None => {
5067 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5068 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5069 &target_holder
5070 }
5071 };
5072
5073 let mut out = Vec::new();
5075 for expr in exprs {
5076 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5077 let ccx = match force_type {
5078 None => CastContext::Implicit,
5079 Some(_) => CastContext::Explicit,
5080 };
5081 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5082 Ok(expr) => out.push(expr),
5083 Err(_) => sql_bail!(
5084 "{} could not convert type {} to {}",
5085 ecx.name,
5086 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5087 ecx.humanize_scalar_type(target, false),
5088 ),
5089 }
5090 }
5091 Ok(out)
5092}
5093
5094pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5097 obe: &OrderByExpr<T>,
5098 column: usize,
5099) -> ColumnOrder {
5100 let desc = !obe.asc.unwrap_or(true);
5101 ColumnOrder {
5102 column,
5103 desc,
5104 nulls_last: obe.nulls_last.unwrap_or(!desc),
5107 }
5108}
5109
5110fn plan_function_order_by(
5118 ecx: &ExprContext,
5119 order_by: &[OrderByExpr<Aug>],
5120) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5121 let mut order_by_exprs = vec![];
5122 let mut col_orders = vec![];
5123 {
5124 for (i, obe) in order_by.iter().enumerate() {
5125 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5129 order_by_exprs.push(expr);
5130 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5131 }
5132 }
5133 Ok((order_by_exprs, col_orders))
5134}
5135
5136fn plan_aggregate_common(
5138 ecx: &ExprContext,
5139 Function::<Aug> {
5140 name,
5141 args,
5142 filter,
5143 over: _,
5144 distinct,
5145 }: &Function<Aug>,
5146) -> Result<AggregateExpr, PlanError> {
5147 let impls = match resolve_func(ecx, name, args)? {
5162 Func::Aggregate(impls) => impls,
5163 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5164 };
5165
5166 let (args, order_by) = match &args {
5175 FunctionArgs::Star => (vec![], vec![]),
5176 FunctionArgs::Args { args, order_by } => {
5177 if args.is_empty() {
5178 sql_bail!(
5179 "{}(*) must be used to call a parameterless aggregate function",
5180 ecx.qcx
5181 .scx
5182 .humanize_resolved_name(name)
5183 .expect("name actually resolved")
5184 );
5185 }
5186 let args = plan_exprs(ecx, args)?;
5187 (args, order_by.clone())
5188 }
5189 };
5190
5191 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5192
5193 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5194 if let Some(filter) = &filter {
5195 let cond =
5205 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5206 let expr_typ = ecx.scalar_type(&expr);
5207 expr = HirScalarExpr::if_then_else(
5208 cond,
5209 expr,
5210 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5211 );
5212 }
5213
5214 let mut seen_outer = false;
5215 let mut seen_inner = false;
5216 #[allow(deprecated)]
5217 expr.visit_columns(0, &mut |depth, col| {
5218 if depth == 0 && col.level == 0 {
5219 seen_inner = true;
5220 } else if col.level > depth {
5221 seen_outer = true;
5222 }
5223 });
5224 if seen_outer && !seen_inner {
5225 bail_unsupported!(
5226 3720,
5227 "aggregate functions that refer exclusively to outer columns"
5228 );
5229 }
5230
5231 if func.is_order_sensitive() {
5234 let field_names = iter::repeat(ColumnName::from(""))
5235 .take(1 + order_by_exprs.len())
5236 .collect();
5237 let mut exprs = vec![expr];
5238 exprs.extend(order_by_exprs);
5239 expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5240 }
5241
5242 Ok(AggregateExpr {
5243 func,
5244 expr: Box::new(expr),
5245 distinct: *distinct,
5246 })
5247}
5248
5249fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5250 let mut names = names.to_vec();
5251 let col_name = normalize::column_name(names.pop().unwrap());
5252
5253 if !names.is_empty() {
5255 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5256 let (i, i_name) = ecx.scope.resolve_table_column(
5257 &ecx.qcx.outer_scopes,
5258 &table_name,
5259 &col_name,
5260 &mut ecx.qcx.name_manager.borrow_mut(),
5261 )?;
5262 return Ok(HirScalarExpr::named_column(i, i_name));
5263 }
5264
5265 let similar_names = match ecx.scope.resolve_column(
5268 &ecx.qcx.outer_scopes,
5269 &col_name,
5270 &mut ecx.qcx.name_manager.borrow_mut(),
5271 ) {
5272 Ok((i, i_name)) => {
5273 return Ok(HirScalarExpr::named_column(i, i_name));
5274 }
5275 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5276 Err(e) => return Err(e),
5277 };
5278
5279 let items = ecx.scope.items_from_table(
5282 &ecx.qcx.outer_scopes,
5283 &PartialItemName {
5284 database: None,
5285 schema: None,
5286 item: col_name.as_str().to_owned(),
5287 },
5288 )?;
5289 match items.as_slice() {
5290 [] => Err(PlanError::UnknownColumn {
5292 table: None,
5293 column: col_name,
5294 similar: similar_names,
5295 }),
5296 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5301 *column,
5302 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5303 )),
5304 _ => {
5307 let mut has_exists_column = None;
5308 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5309 .into_iter()
5310 .filter_map(|(column, item)| {
5311 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5312 has_exists_column = Some(column);
5313 None
5314 } else {
5315 let expr = HirScalarExpr::named_column(
5316 column,
5317 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5318 );
5319 let name = item.column_name.clone();
5320 Some((expr, name))
5321 }
5322 })
5323 .unzip();
5324 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5326 exprs.into_element()
5327 } else {
5328 HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5329 };
5330 if let Some(has_exists_column) = has_exists_column {
5331 Ok(HirScalarExpr::if_then_else(
5332 HirScalarExpr::unnamed_column(has_exists_column)
5333 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5334 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5335 expr,
5336 ))
5337 } else {
5338 Ok(expr)
5339 }
5340 }
5341 }
5342}
5343
5344fn plan_op(
5345 ecx: &ExprContext,
5346 op: &str,
5347 expr1: &Expr<Aug>,
5348 expr2: Option<&Expr<Aug>>,
5349) -> Result<HirScalarExpr, PlanError> {
5350 let impls = func::resolve_op(op)?;
5351 let args = match expr2 {
5352 None => plan_exprs(ecx, &[expr1])?,
5353 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5354 };
5355 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5356}
5357
5358fn plan_function<'a>(
5359 ecx: &ExprContext,
5360 f @ Function {
5361 name,
5362 args,
5363 filter,
5364 over,
5365 distinct,
5366 }: &'a Function<Aug>,
5367) -> Result<HirScalarExpr, PlanError> {
5368 let impls = match resolve_func(ecx, name, args)? {
5369 Func::Table(_) => {
5370 sql_bail!(
5371 "table functions are not allowed in {} (function {})",
5372 ecx.name,
5373 name
5374 );
5375 }
5376 Func::Scalar(impls) => {
5377 if over.is_some() {
5378 sql_bail!(
5379 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5380 );
5381 }
5382 impls
5383 }
5384 Func::ScalarWindow(impls) => {
5385 let (
5386 ignore_nulls,
5387 order_by_exprs,
5388 col_orders,
5389 _window_frame,
5390 partition_by,
5391 scalar_args,
5392 ) = plan_window_function_non_aggr(ecx, f)?;
5393
5394 if !scalar_args.is_empty() {
5398 if let ResolvedItemName::Item {
5399 full_name: FullItemName { item, .. },
5400 ..
5401 } = name
5402 {
5403 sql_bail!(
5404 "function {} has 0 parameters, but was called with {}",
5405 item,
5406 scalar_args.len()
5407 );
5408 }
5409 }
5410
5411 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5416
5417 if ignore_nulls {
5418 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5421 }
5422
5423 return Ok(HirScalarExpr::windowing(WindowExpr {
5424 func: WindowExprType::Scalar(ScalarWindowExpr {
5425 func,
5426 order_by: col_orders,
5427 }),
5428 partition_by,
5429 order_by: order_by_exprs,
5430 }));
5431 }
5432 Func::ValueWindow(impls) => {
5433 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5434 plan_window_function_non_aggr(ecx, f)?;
5435
5436 let (args_encoded, func) =
5437 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5438
5439 if ignore_nulls {
5440 match func {
5441 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5442 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5443 }
5444 }
5445
5446 return Ok(HirScalarExpr::windowing(WindowExpr {
5447 func: WindowExprType::Value(ValueWindowExpr {
5448 func,
5449 args: Box::new(args_encoded),
5450 order_by: col_orders,
5451 window_frame,
5452 ignore_nulls, }),
5454 partition_by,
5455 order_by: order_by_exprs,
5456 }));
5457 }
5458 Func::Aggregate(_) => {
5459 if f.over.is_none() {
5460 if ecx.allow_aggregates {
5462 sql_bail!(
5465 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5466 name,
5467 );
5468 } else {
5469 sql_bail!(
5472 "aggregate functions are not allowed in {} (function {})",
5473 ecx.name,
5474 name
5475 );
5476 }
5477 } else {
5478 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5479 plan_window_function_common(ecx, &f.name, &f.over)?;
5480
5481 match (&window_frame.start_bound, &window_frame.end_bound) {
5483 (
5484 mz_expr::WindowFrameBound::UnboundedPreceding,
5485 mz_expr::WindowFrameBound::OffsetPreceding(..),
5486 )
5487 | (
5488 mz_expr::WindowFrameBound::UnboundedPreceding,
5489 mz_expr::WindowFrameBound::OffsetFollowing(..),
5490 )
5491 | (
5492 mz_expr::WindowFrameBound::OffsetPreceding(..),
5493 mz_expr::WindowFrameBound::UnboundedFollowing,
5494 )
5495 | (
5496 mz_expr::WindowFrameBound::OffsetFollowing(..),
5497 mz_expr::WindowFrameBound::UnboundedFollowing,
5498 ) => bail_unsupported!("mixed unbounded - offset frames"),
5499 (_, _) => {} }
5501
5502 if ignore_nulls {
5503 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5507 }
5508
5509 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5510
5511 if aggregate_expr.distinct {
5512 bail_unsupported!("DISTINCT in window aggregates");
5514 }
5515
5516 return Ok(HirScalarExpr::windowing(WindowExpr {
5517 func: WindowExprType::Aggregate(AggregateWindowExpr {
5518 aggregate_expr,
5519 order_by: col_orders,
5520 window_frame,
5521 }),
5522 partition_by,
5523 order_by: order_by_exprs,
5524 }));
5525 }
5526 }
5527 };
5528
5529 if over.is_some() {
5530 unreachable!("If there is an OVER clause, we should have returned already above.");
5531 }
5532
5533 if *distinct {
5534 sql_bail!(
5535 "DISTINCT specified, but {} is not an aggregate function",
5536 ecx.qcx
5537 .scx
5538 .humanize_resolved_name(name)
5539 .expect("already resolved")
5540 );
5541 }
5542 if filter.is_some() {
5543 sql_bail!(
5544 "FILTER specified, but {} is not an aggregate function",
5545 ecx.qcx
5546 .scx
5547 .humanize_resolved_name(name)
5548 .expect("already resolved")
5549 );
5550 }
5551
5552 let scalar_args = match &args {
5553 FunctionArgs::Star => {
5554 sql_bail!(
5555 "* argument is invalid with non-aggregate function {}",
5556 ecx.qcx
5557 .scx
5558 .humanize_resolved_name(name)
5559 .expect("already resolved")
5560 )
5561 }
5562 FunctionArgs::Args { args, order_by } => {
5563 if !order_by.is_empty() {
5564 sql_bail!(
5565 "ORDER BY specified, but {} is not an aggregate function",
5566 ecx.qcx
5567 .scx
5568 .humanize_resolved_name(name)
5569 .expect("already resolved")
5570 );
5571 }
5572 plan_exprs(ecx, args)?
5573 }
5574 };
5575
5576 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5577}
5578
5579pub const IGNORE_NULLS_ERROR_MSG: &str =
5580 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5581
5582pub fn resolve_func(
5586 ecx: &ExprContext,
5587 name: &ResolvedItemName,
5588 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5589) -> Result<&'static Func, PlanError> {
5590 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5591 if let Ok(f) = i.func() {
5592 return Ok(f);
5593 }
5594 }
5595
5596 let cexprs = match args {
5599 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5600 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5601 if !order_by.is_empty() {
5602 sql_bail!(
5603 "ORDER BY specified, but {} is not an aggregate function",
5604 name
5605 );
5606 }
5607 plan_exprs(ecx, args)?
5608 }
5609 };
5610
5611 let arg_types: Vec<_> = cexprs
5612 .into_iter()
5613 .map(|ty| match ecx.scalar_type(&ty) {
5614 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5615 CoercibleScalarType::Record(_) => "record".to_string(),
5616 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5617 })
5618 .collect();
5619
5620 Err(PlanError::UnknownFunction {
5621 name: name.to_string(),
5622 arg_types,
5623 })
5624}
5625
5626fn plan_is_expr<'a>(
5627 ecx: &ExprContext,
5628 expr: &'a Expr<Aug>,
5629 construct: &IsExprConstruct<Aug>,
5630 not: bool,
5631) -> Result<HirScalarExpr, PlanError> {
5632 let expr = plan_expr(ecx, expr)?;
5633 let mut expr = match construct {
5634 IsExprConstruct::Null => {
5635 let expr = expr.type_as_any(ecx)?;
5640 expr.call_is_null()
5641 }
5642 IsExprConstruct::Unknown => {
5643 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5644 expr.call_is_null()
5645 }
5646 IsExprConstruct::True => {
5647 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5648 expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5649 }
5650 IsExprConstruct::False => {
5651 let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5652 expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5653 }
5654 IsExprConstruct::DistinctFrom(expr2) => {
5655 let expr1 = expr.type_as_any(ecx)?;
5656 let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5657 let term1 = HirScalarExpr::variadic_or(vec![
5664 expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5665 expr1.clone().call_is_null(),
5666 expr2.clone().call_is_null(),
5667 ]);
5668 let term2 = HirScalarExpr::variadic_or(vec![
5669 expr1.call_is_null().not(),
5670 expr2.call_is_null().not(),
5671 ]);
5672 term1.and(term2)
5673 }
5674 };
5675 if not {
5676 expr = expr.not();
5677 }
5678 Ok(expr)
5679}
5680
5681fn plan_case<'a>(
5682 ecx: &ExprContext,
5683 operand: &'a Option<Box<Expr<Aug>>>,
5684 conditions: &'a [Expr<Aug>],
5685 results: &'a [Expr<Aug>],
5686 else_result: &'a Option<Box<Expr<Aug>>>,
5687) -> Result<HirScalarExpr, PlanError> {
5688 let mut cond_exprs = Vec::new();
5689 let mut result_exprs = Vec::new();
5690 for (c, r) in conditions.iter().zip_eq(results) {
5691 let c = match operand {
5692 Some(operand) => operand.clone().equals(c.clone()),
5693 None => c.clone(),
5694 };
5695 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5696 cond_exprs.push(cexpr);
5697 result_exprs.push(r);
5698 }
5699 result_exprs.push(match else_result {
5700 Some(else_result) => else_result,
5701 None => &Expr::Value(Value::Null),
5702 });
5703 let mut result_exprs = coerce_homogeneous_exprs(
5704 &ecx.with_name("CASE"),
5705 plan_exprs(ecx, &result_exprs)?,
5706 None,
5707 )?;
5708 let mut expr = result_exprs.pop().unwrap();
5709 assert_eq!(cond_exprs.len(), result_exprs.len());
5710 for (cexpr, rexpr) in cond_exprs
5711 .into_iter()
5712 .rev()
5713 .zip_eq(result_exprs.into_iter().rev())
5714 {
5715 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5716 }
5717 Ok(expr)
5718}
5719
5720fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5721 let (datum, scalar_type) = match l {
5722 Value::Number(s) => {
5723 let d = strconv::parse_numeric(s.as_str())?;
5724 if !s.contains(&['E', '.'][..]) {
5725 if let Ok(n) = d.0.try_into() {
5727 (Datum::Int32(n), SqlScalarType::Int32)
5728 } else if let Ok(n) = d.0.try_into() {
5729 (Datum::Int64(n), SqlScalarType::Int64)
5730 } else {
5731 (
5732 Datum::Numeric(d),
5733 SqlScalarType::Numeric { max_scale: None },
5734 )
5735 }
5736 } else {
5737 (
5738 Datum::Numeric(d),
5739 SqlScalarType::Numeric { max_scale: None },
5740 )
5741 }
5742 }
5743 Value::HexString(_) => bail_unsupported!("hex string literals"),
5744 Value::Boolean(b) => match b {
5745 false => (Datum::False, SqlScalarType::Bool),
5746 true => (Datum::True, SqlScalarType::Bool),
5747 },
5748 Value::Interval(i) => {
5749 let i = literal::plan_interval(i)?;
5750 (Datum::Interval(i), SqlScalarType::Interval)
5751 }
5752 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5753 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5754 };
5755 let expr = HirScalarExpr::literal(datum, scalar_type);
5756 Ok(expr.into())
5757}
5758
5759fn plan_window_function_non_aggr<'a>(
5762 ecx: &ExprContext,
5763 Function {
5764 name,
5765 args,
5766 filter,
5767 over,
5768 distinct,
5769 }: &'a Function<Aug>,
5770) -> Result<
5771 (
5772 bool,
5773 Vec<HirScalarExpr>,
5774 Vec<ColumnOrder>,
5775 mz_expr::WindowFrame,
5776 Vec<HirScalarExpr>,
5777 Vec<CoercibleScalarExpr>,
5778 ),
5779 PlanError,
5780> {
5781 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5782 plan_window_function_common(ecx, name, over)?;
5783
5784 if *distinct {
5785 sql_bail!(
5786 "DISTINCT specified, but {} is not an aggregate function",
5787 name
5788 );
5789 }
5790
5791 if filter.is_some() {
5792 bail_unsupported!("FILTER in non-aggregate window functions");
5793 }
5794
5795 let scalar_args = match &args {
5796 FunctionArgs::Star => {
5797 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5798 }
5799 FunctionArgs::Args { args, order_by } => {
5800 if !order_by.is_empty() {
5801 sql_bail!(
5802 "ORDER BY specified, but {} is not an aggregate function",
5803 name
5804 );
5805 }
5806 plan_exprs(ecx, args)?
5807 }
5808 };
5809
5810 Ok((
5811 ignore_nulls,
5812 order_by_exprs,
5813 col_orders,
5814 window_frame,
5815 partition,
5816 scalar_args,
5817 ))
5818}
5819
5820fn plan_window_function_common(
5822 ecx: &ExprContext,
5823 name: &<Aug as AstInfo>::ItemName,
5824 over: &Option<WindowSpec<Aug>>,
5825) -> Result<
5826 (
5827 bool,
5828 Vec<HirScalarExpr>,
5829 Vec<ColumnOrder>,
5830 mz_expr::WindowFrame,
5831 Vec<HirScalarExpr>,
5832 ),
5833 PlanError,
5834> {
5835 if !ecx.allow_windows {
5836 sql_bail!(
5837 "window functions are not allowed in {} (function {})",
5838 ecx.name,
5839 name
5840 );
5841 }
5842
5843 let window_spec = match over.as_ref() {
5844 Some(over) => over,
5845 None => sql_bail!("window function {} requires an OVER clause", name),
5846 };
5847 if window_spec.ignore_nulls && window_spec.respect_nulls {
5848 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5849 }
5850 let window_frame = match window_spec.window_frame.as_ref() {
5851 Some(frame) => plan_window_frame(frame)?,
5852 None => mz_expr::WindowFrame::default(),
5853 };
5854 let mut partition = Vec::new();
5855 for expr in &window_spec.partition_by {
5856 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5857 }
5858
5859 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5860
5861 Ok((
5862 window_spec.ignore_nulls,
5863 order_by_exprs,
5864 col_orders,
5865 window_frame,
5866 partition,
5867 ))
5868}
5869
5870fn plan_window_frame(
5871 WindowFrame {
5872 units,
5873 start_bound,
5874 end_bound,
5875 }: &WindowFrame,
5876) -> Result<mz_expr::WindowFrame, PlanError> {
5877 use mz_expr::WindowFrameBound::*;
5878 let units = window_frame_unit_ast_to_expr(units)?;
5879 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5880 let end_bound = end_bound
5881 .as_ref()
5882 .map(window_frame_bound_ast_to_expr)
5883 .unwrap_or(CurrentRow);
5884
5885 match (&start_bound, &end_bound) {
5887 (UnboundedFollowing, _) => {
5889 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5890 }
5891 (_, UnboundedPreceding) => {
5893 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5894 }
5895 (CurrentRow, OffsetPreceding(_)) => {
5897 sql_bail!("frame starting from current row cannot have preceding rows")
5898 }
5899 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5900 sql_bail!("frame starting from following row cannot have preceding rows")
5901 }
5902 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5905 if *o1 > 1000000 || *o2 > 1000000 {
5909 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5910 }
5911 }
5912 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5913 if *o1 > 1000000 || *o2 > 1000000 {
5914 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5915 }
5916 }
5917 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5918 if *o1 > 1000000 || *o2 > 1000000 {
5919 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5920 }
5921 }
5922 (OffsetPreceding(o), CurrentRow) => {
5923 if *o > 1000000 {
5924 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5925 }
5926 }
5927 (CurrentRow, OffsetFollowing(o)) => {
5928 if *o > 1000000 {
5929 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5930 }
5931 }
5932 (_, _) => (),
5934 }
5935
5936 if units == mz_expr::WindowFrameUnits::Range
5939 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5940 {
5941 bail_unsupported!("RANGE in non-default window frames")
5942 }
5943
5944 let frame = mz_expr::WindowFrame {
5945 units,
5946 start_bound,
5947 end_bound,
5948 };
5949 Ok(frame)
5950}
5951
5952fn window_frame_unit_ast_to_expr(
5953 unit: &WindowFrameUnits,
5954) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5955 match unit {
5956 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5957 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5958 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5959 }
5960}
5961
5962fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5963 match bound {
5964 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5965 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5966 WindowFrameBound::Preceding(Some(offset)) => {
5967 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5968 }
5969 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5970 WindowFrameBound::Following(Some(offset)) => {
5971 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5972 }
5973 }
5974}
5975
5976pub fn scalar_type_from_sql(
5977 scx: &StatementContext,
5978 data_type: &ResolvedDataType,
5979) -> Result<SqlScalarType, PlanError> {
5980 match data_type {
5981 ResolvedDataType::AnonymousList(elem_type) => {
5982 let elem_type = scalar_type_from_sql(scx, elem_type)?;
5983 if matches!(elem_type, SqlScalarType::Char { .. }) {
5984 bail_unsupported!("char list");
5985 }
5986 Ok(SqlScalarType::List {
5987 element_type: Box::new(elem_type),
5988 custom_id: None,
5989 })
5990 }
5991 ResolvedDataType::AnonymousMap {
5992 key_type,
5993 value_type,
5994 } => {
5995 match scalar_type_from_sql(scx, key_type)? {
5996 SqlScalarType::String => {}
5997 other => sql_bail!(
5998 "map key type must be {}, got {}",
5999 scx.humanize_scalar_type(&SqlScalarType::String, false),
6000 scx.humanize_scalar_type(&other, false)
6001 ),
6002 }
6003 Ok(SqlScalarType::Map {
6004 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6005 custom_id: None,
6006 })
6007 }
6008 ResolvedDataType::Named { id, modifiers, .. } => {
6009 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6010 }
6011 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6012 }
6013}
6014
6015pub fn scalar_type_from_catalog(
6016 catalog: &dyn SessionCatalog,
6017 id: CatalogItemId,
6018 modifiers: &[i64],
6019) -> Result<SqlScalarType, PlanError> {
6020 let entry = catalog.get_item(&id);
6021 let type_details = match entry.type_details() {
6022 Some(type_details) => type_details,
6023 None => {
6024 sql_bail!(
6027 "internal error: {} does not refer to a type",
6028 catalog.resolve_full_name(entry.name()).to_string().quoted()
6029 );
6030 }
6031 };
6032 match &type_details.typ {
6033 CatalogType::Numeric => {
6034 let mut modifiers = modifiers.iter().fuse();
6035 let precision = match modifiers.next() {
6036 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6037 sql_bail!(
6038 "precision for type numeric must be between 1 and {}",
6039 NUMERIC_DATUM_MAX_PRECISION,
6040 );
6041 }
6042 Some(p) => Some(*p),
6043 None => None,
6044 };
6045 let scale = match modifiers.next() {
6046 Some(scale) => {
6047 if let Some(precision) = precision {
6048 if *scale > precision {
6049 sql_bail!(
6050 "scale for type numeric must be between 0 and precision {}",
6051 precision
6052 );
6053 }
6054 }
6055 Some(NumericMaxScale::try_from(*scale)?)
6056 }
6057 None => None,
6058 };
6059 if modifiers.next().is_some() {
6060 sql_bail!("type numeric supports at most two type modifiers");
6061 }
6062 Ok(SqlScalarType::Numeric { max_scale: scale })
6063 }
6064 CatalogType::Char => {
6065 let mut modifiers = modifiers.iter().fuse();
6066 let length = match modifiers.next() {
6067 Some(l) => Some(CharLength::try_from(*l)?),
6068 None => Some(CharLength::ONE),
6069 };
6070 if modifiers.next().is_some() {
6071 sql_bail!("type character supports at most one type modifier");
6072 }
6073 Ok(SqlScalarType::Char { length })
6074 }
6075 CatalogType::VarChar => {
6076 let mut modifiers = modifiers.iter().fuse();
6077 let length = match modifiers.next() {
6078 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6079 None => None,
6080 };
6081 if modifiers.next().is_some() {
6082 sql_bail!("type character varying supports at most one type modifier");
6083 }
6084 Ok(SqlScalarType::VarChar { max_length: length })
6085 }
6086 CatalogType::Timestamp => {
6087 let mut modifiers = modifiers.iter().fuse();
6088 let precision = match modifiers.next() {
6089 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6090 None => None,
6091 };
6092 if modifiers.next().is_some() {
6093 sql_bail!("type timestamp supports at most one type modifier");
6094 }
6095 Ok(SqlScalarType::Timestamp { precision })
6096 }
6097 CatalogType::TimestampTz => {
6098 let mut modifiers = modifiers.iter().fuse();
6099 let precision = match modifiers.next() {
6100 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6101 None => None,
6102 };
6103 if modifiers.next().is_some() {
6104 sql_bail!("type timestamp with time zone supports at most one type modifier");
6105 }
6106 Ok(SqlScalarType::TimestampTz { precision })
6107 }
6108 t => {
6109 if !modifiers.is_empty() {
6110 sql_bail!(
6111 "{} does not support type modifiers",
6112 catalog.resolve_full_name(entry.name()).to_string()
6113 );
6114 }
6115 match t {
6116 CatalogType::Array {
6117 element_reference: element_id,
6118 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6119 catalog,
6120 *element_id,
6121 modifiers,
6122 )?))),
6123 CatalogType::List {
6124 element_reference: element_id,
6125 element_modifiers,
6126 } => Ok(SqlScalarType::List {
6127 element_type: Box::new(scalar_type_from_catalog(
6128 catalog,
6129 *element_id,
6130 element_modifiers,
6131 )?),
6132 custom_id: Some(id),
6133 }),
6134 CatalogType::Map {
6135 key_reference: _,
6136 key_modifiers: _,
6137 value_reference: value_id,
6138 value_modifiers,
6139 } => Ok(SqlScalarType::Map {
6140 value_type: Box::new(scalar_type_from_catalog(
6141 catalog,
6142 *value_id,
6143 value_modifiers,
6144 )?),
6145 custom_id: Some(id),
6146 }),
6147 CatalogType::Range {
6148 element_reference: element_id,
6149 } => Ok(SqlScalarType::Range {
6150 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6151 }),
6152 CatalogType::Record { fields } => {
6153 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6154 .iter()
6155 .map(|f| {
6156 let scalar_type = scalar_type_from_catalog(
6157 catalog,
6158 f.type_reference,
6159 &f.type_modifiers,
6160 )?;
6161 Ok((
6162 f.name.clone(),
6163 SqlColumnType {
6164 scalar_type,
6165 nullable: true,
6166 },
6167 ))
6168 })
6169 .collect::<Result<Box<_>, PlanError>>()?;
6170 Ok(SqlScalarType::Record {
6171 fields: scalars,
6172 custom_id: Some(id),
6173 })
6174 }
6175 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6176 CatalogType::Bool => Ok(SqlScalarType::Bool),
6177 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6178 CatalogType::Date => Ok(SqlScalarType::Date),
6179 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6180 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6181 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6182 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6183 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6184 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6185 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6186 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6187 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6188 CatalogType::Interval => Ok(SqlScalarType::Interval),
6189 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6190 CatalogType::Oid => Ok(SqlScalarType::Oid),
6191 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6192 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6193 CatalogType::Pseudo => {
6194 sql_bail!(
6195 "cannot reference pseudo type {}",
6196 catalog.resolve_full_name(entry.name()).to_string()
6197 )
6198 }
6199 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6200 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6201 CatalogType::RegType => Ok(SqlScalarType::RegType),
6202 CatalogType::String => Ok(SqlScalarType::String),
6203 CatalogType::Time => Ok(SqlScalarType::Time),
6204 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6205 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6206 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6207 CatalogType::Numeric => unreachable!("handled above"),
6208 CatalogType::Char => unreachable!("handled above"),
6209 CatalogType::VarChar => unreachable!("handled above"),
6210 CatalogType::Timestamp => unreachable!("handled above"),
6211 CatalogType::TimestampTz => unreachable!("handled above"),
6212 }
6213 }
6214 }
6215}
6216
6217struct AggregateTableFuncVisitor<'a> {
6220 scx: &'a StatementContext<'a>,
6221 aggs: Vec<Function<Aug>>,
6222 within_aggregate: bool,
6223 tables: BTreeMap<Function<Aug>, String>,
6224 table_disallowed_context: Vec<&'static str>,
6225 in_select_item: bool,
6226 id_gen: IdGen,
6227 err: Option<PlanError>,
6228}
6229
6230impl<'a> AggregateTableFuncVisitor<'a> {
6231 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6232 AggregateTableFuncVisitor {
6233 scx,
6234 aggs: Vec::new(),
6235 within_aggregate: false,
6236 tables: BTreeMap::new(),
6237 table_disallowed_context: Vec::new(),
6238 in_select_item: false,
6239 id_gen: Default::default(),
6240 err: None,
6241 }
6242 }
6243
6244 fn into_result(
6245 self,
6246 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6247 match self.err {
6248 Some(err) => Err(err),
6249 None => {
6250 let mut seen = BTreeSet::new();
6253 let aggs = self
6254 .aggs
6255 .into_iter()
6256 .filter(move |agg| seen.insert(agg.clone()))
6257 .collect();
6258 Ok((aggs, self.tables))
6259 }
6260 }
6261 }
6262}
6263
6264impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6265 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6266 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6267 Ok(i) => i,
6268 Err(_) => return,
6270 };
6271
6272 match item.func() {
6273 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6276 if self.within_aggregate {
6277 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6278 return;
6279 }
6280 self.aggs.push(func.clone());
6281 let Function {
6282 name: _,
6283 args,
6284 filter,
6285 over: _,
6286 distinct: _,
6287 } = func;
6288 if let Some(filter) = filter {
6289 self.visit_expr_mut(filter);
6290 }
6291 let old_within_aggregate = self.within_aggregate;
6292 self.within_aggregate = true;
6293 self.table_disallowed_context
6294 .push("aggregate function calls");
6295
6296 self.visit_function_args_mut(args);
6297
6298 self.within_aggregate = old_within_aggregate;
6299 self.table_disallowed_context.pop();
6300 }
6301 Ok(Func::Table { .. }) => {
6302 self.table_disallowed_context.push("other table functions");
6303 visit_mut::visit_function_mut(self, func);
6304 self.table_disallowed_context.pop();
6305 }
6306 _ => visit_mut::visit_function_mut(self, func),
6307 }
6308 }
6309
6310 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6311 }
6313
6314 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6315 let (disallowed_context, func) = match expr {
6316 Expr::Case { .. } => (Some("CASE"), None),
6317 Expr::HomogenizingFunction {
6318 function: HomogenizingFunction::Coalesce,
6319 ..
6320 } => (Some("COALESCE"), None),
6321 Expr::Function(func) if self.in_select_item => {
6322 let mut table_func = None;
6325 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6326 if let Ok(Func::Table { .. }) = item.func() {
6327 if let Some(context) = self.table_disallowed_context.last() {
6328 self.err = Some(sql_err!(
6329 "table functions are not allowed in {} (function {})",
6330 context,
6331 func.name
6332 ));
6333 return;
6334 }
6335 table_func = Some(func.clone());
6336 }
6337 }
6338 (None, table_func)
6341 }
6342 _ => (None, None),
6343 };
6344 if let Some(func) = func {
6345 visit_mut::visit_expr_mut(self, expr);
6347 if let Function {
6349 name: _,
6350 args: _,
6351 filter: None,
6352 over: None,
6353 distinct: false,
6354 } = &func
6355 {
6356 let unique_id = self.id_gen.allocate_id();
6358 let id = self
6359 .tables
6360 .entry(func)
6361 .or_insert_with(|| format!("table_func_{unique_id}"));
6362 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6365 }
6366 }
6367 if let Some(context) = disallowed_context {
6368 self.table_disallowed_context.push(context);
6369 }
6370
6371 visit_mut::visit_expr_mut(self, expr);
6372
6373 if disallowed_context.is_some() {
6374 self.table_disallowed_context.pop();
6375 }
6376 }
6377
6378 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6379 let old = self.in_select_item;
6380 self.in_select_item = true;
6381 visit_mut::visit_select_item_mut(self, si);
6382 self.in_select_item = old;
6383 }
6384}
6385
6386#[derive(Default)]
6387struct WindowFuncCollector {
6388 window_funcs: Vec<Expr<Aug>>,
6389}
6390
6391impl WindowFuncCollector {
6392 fn into_result(self) -> Vec<Expr<Aug>> {
6393 let mut seen = BTreeSet::new();
6395 let window_funcs_dedupped = self
6396 .window_funcs
6397 .into_iter()
6398 .filter(move |expr| seen.insert(expr.clone()))
6399 .rev()
6402 .collect();
6403 window_funcs_dedupped
6404 }
6405}
6406
6407impl Visit<'_, Aug> for WindowFuncCollector {
6408 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6409 match expr {
6410 Expr::Function(func) => {
6411 if func.over.is_some() {
6412 self.window_funcs.push(expr.clone());
6413 }
6414 }
6415 _ => (),
6416 }
6417 visit::visit_expr(self, expr);
6418 }
6419
6420 fn visit_query(&mut self, _query: &Query<Aug>) {
6421 }
6423}
6424
6425#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6427pub enum QueryLifetime {
6428 OneShot,
6430 Index,
6432 MaterializedView,
6434 Subscribe,
6436 View,
6438 Source,
6440}
6441
6442impl QueryLifetime {
6443 pub fn is_one_shot(&self) -> bool {
6447 let result = match self {
6448 QueryLifetime::OneShot => true,
6449 QueryLifetime::Index => false,
6450 QueryLifetime::MaterializedView => false,
6451 QueryLifetime::Subscribe => false,
6452 QueryLifetime::View => false,
6453 QueryLifetime::Source => false,
6454 };
6455 assert_eq!(!result, self.is_maintained());
6456 result
6457 }
6458
6459 pub fn is_maintained(&self) -> bool {
6462 match self {
6463 QueryLifetime::OneShot => false,
6464 QueryLifetime::Index => true,
6465 QueryLifetime::MaterializedView => true,
6466 QueryLifetime::Subscribe => true,
6467 QueryLifetime::View => true,
6468 QueryLifetime::Source => true,
6469 }
6470 }
6471
6472 pub fn allow_show(&self) -> bool {
6474 match self {
6475 QueryLifetime::OneShot => true,
6476 QueryLifetime::Index => false,
6477 QueryLifetime::MaterializedView => false,
6478 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6480 QueryLifetime::Source => false,
6481 }
6482 }
6483}
6484
6485#[derive(Debug, Clone)]
6487pub struct CteDesc {
6488 pub name: String,
6489 pub desc: RelationDesc,
6490}
6491
6492#[derive(Debug, Clone)]
6494pub struct QueryContext<'a> {
6495 pub scx: &'a StatementContext<'a>,
6497 pub lifetime: QueryLifetime,
6499 pub outer_scopes: Vec<Scope>,
6501 pub outer_relation_types: Vec<SqlRelationType>,
6503 pub ctes: BTreeMap<LocalId, CteDesc>,
6505 pub name_manager: Rc<RefCell<NameManager>>,
6507 pub recursion_guard: RecursionGuard,
6508}
6509
6510impl CheckedRecursion for QueryContext<'_> {
6511 fn recursion_guard(&self) -> &RecursionGuard {
6512 &self.recursion_guard
6513 }
6514}
6515
6516impl<'a> QueryContext<'a> {
6517 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6518 QueryContext {
6519 scx,
6520 lifetime,
6521 outer_scopes: vec![],
6522 outer_relation_types: vec![],
6523 ctes: BTreeMap::new(),
6524 name_manager: Rc::new(RefCell::new(NameManager::new())),
6525 recursion_guard: RecursionGuard::with_limit(1024), }
6527 }
6528
6529 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6530 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6531 }
6532
6533 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6536 let ctes = self.ctes.clone();
6537 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6538 let outer_relation_types = iter::once(relation_type)
6539 .chain(self.outer_relation_types.clone())
6540 .collect();
6541 let name_manager = Rc::clone(&self.name_manager);
6543
6544 QueryContext {
6545 scx: self.scx,
6546 lifetime: self.lifetime,
6547 outer_scopes,
6548 outer_relation_types,
6549 ctes,
6550 name_manager,
6551 recursion_guard: self.recursion_guard.clone(),
6552 }
6553 }
6554
6555 fn empty_derived_context(&self) -> QueryContext<'a> {
6557 let scope = Scope::empty();
6558 let ty = SqlRelationType::empty();
6559 self.derived_context(scope, ty)
6560 }
6561
6562 pub fn resolve_table_name(
6565 &self,
6566 object: ResolvedItemName,
6567 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6568 match object {
6569 ResolvedItemName::Item {
6570 id,
6571 full_name,
6572 version,
6573 ..
6574 } => {
6575 let item = self.scx.get_item(&id).at_version(version);
6576 let desc = match item.relation_desc() {
6577 Some(desc) => desc.clone(),
6578 None => {
6579 return Err(PlanError::InvalidDependency {
6580 name: full_name.to_string(),
6581 item_type: item.item_type(),
6582 });
6583 }
6584 };
6585 let expr = HirRelationExpr::Get {
6586 id: Id::Global(item.global_id()),
6587 typ: desc.typ().clone(),
6588 };
6589
6590 let name = full_name.into();
6591 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6592
6593 Ok((expr, scope))
6594 }
6595 ResolvedItemName::Cte { id, name } => {
6596 let name = name.into();
6597 let cte = self.ctes.get(&id).unwrap();
6598 let expr = HirRelationExpr::Get {
6599 id: Id::Local(id),
6600 typ: cte.desc.typ().clone(),
6601 };
6602
6603 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6604
6605 Ok((expr, scope))
6606 }
6607 ResolvedItemName::ContinualTask { id, name } => {
6608 let cte = self.ctes.get(&id).unwrap();
6609 let expr = HirRelationExpr::Get {
6610 id: Id::Local(id),
6611 typ: cte.desc.typ().clone(),
6612 };
6613
6614 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6615
6616 Ok((expr, scope))
6617 }
6618 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6619 }
6620 }
6621
6622 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6625 self.scx.humanize_scalar_type(typ, postgres_compat)
6626 }
6627}
6628
6629#[derive(Debug, Clone)]
6631pub struct ExprContext<'a> {
6632 pub qcx: &'a QueryContext<'a>,
6633 pub name: &'a str,
6635 pub scope: &'a Scope,
6638 pub relation_type: &'a SqlRelationType,
6641 pub allow_aggregates: bool,
6643 pub allow_subqueries: bool,
6645 pub allow_parameters: bool,
6647 pub allow_windows: bool,
6649}
6650
6651impl CheckedRecursion for ExprContext<'_> {
6652 fn recursion_guard(&self) -> &RecursionGuard {
6653 &self.qcx.recursion_guard
6654 }
6655}
6656
6657impl<'a> ExprContext<'a> {
6658 pub fn catalog(&self) -> &dyn SessionCatalog {
6659 self.qcx.scx.catalog
6660 }
6661
6662 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6663 let mut ecx = self.clone();
6664 ecx.name = name;
6665 ecx
6666 }
6667
6668 pub fn column_type<E>(&self, expr: &E) -> E::Type
6669 where
6670 E: AbstractExpr,
6671 {
6672 expr.typ(
6673 &self.qcx.outer_relation_types,
6674 self.relation_type,
6675 &self.qcx.scx.param_types.borrow(),
6676 )
6677 }
6678
6679 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6680 where
6681 E: AbstractExpr,
6682 {
6683 self.column_type(expr).scalar_type()
6684 }
6685
6686 fn derived_query_context(&self) -> QueryContext<'_> {
6687 let mut scope = self.scope.clone();
6688 scope.lateral_barrier = true;
6689 self.qcx.derived_context(scope, self.relation_type.clone())
6690 }
6691
6692 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6693 self.qcx.scx.require_feature_flag(flag)
6694 }
6695
6696 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6697 &self.qcx.scx.param_types
6698 }
6699
6700 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6703 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6704 }
6705
6706 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6707 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6708 }
6709}
6710
6711#[derive(Debug, Clone)]
6717pub struct NameManager(BTreeSet<Arc<str>>);
6718
6719impl NameManager {
6720 pub fn new() -> Self {
6722 Self(BTreeSet::new())
6723 }
6724
6725 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6728 let s = s.as_ref();
6729 if let Some(interned) = self.0.get(s) {
6730 Arc::clone(interned)
6731 } else {
6732 let interned: Arc<str> = Arc::from(s);
6733 self.0.insert(Arc::clone(&interned));
6734 interned
6735 }
6736 }
6737
6738 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6741 self.intern(item.column_name.as_str())
6757 }
6758}
6759
6760#[cfg(test)]
6761mod test {
6762 use super::*;
6763
6764 #[mz_ore::test]
6769 pub fn test_name_manager_string_interning() {
6770 let mut nm = NameManager::new();
6771
6772 let orig_hi = "hi";
6773 let hi = nm.intern(orig_hi);
6774 let hello = nm.intern("hello");
6775
6776 assert_ne!(hi.as_ptr(), hello.as_ptr());
6777
6778 let hi2 = nm.intern("hi");
6780 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6781
6782 let s = format!(
6784 "{}{}",
6785 hi.chars().nth(0).unwrap(),
6786 hi2.chars().nth(1).unwrap()
6787 );
6788 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6790
6791 let hi3 = nm.intern(s);
6792 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6793 }
6794}