1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50 ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51 MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
56 func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::{
70 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
71 ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
72 UNKNOWN_COLUMN_NAME, strconv,
73};
74use mz_sql_parser::ast::display::AstDisplay;
75use mz_sql_parser::ast::visit::Visit;
76use mz_sql_parser::ast::visit_mut::{self, VisitMut};
77use mz_sql_parser::ast::{
78 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
79 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
80 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
81 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
82 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
83 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
84 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
85 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
86};
87use mz_sql_parser::ident;
88
89use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
90use crate::func::{self, Func, FuncSpec, TableFuncImpl};
91use crate::names::{
92 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
93};
94use crate::plan::PlanError::InvalidWmrRecursionLimit;
95use crate::plan::error::PlanError;
96use crate::plan::hir::{
97 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
98 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
99 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
100 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
101};
102use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
103use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
104use crate::plan::statement::{StatementContext, StatementDesc, show};
105use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
106use crate::plan::{
107 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
108 literal, transform_ast,
109};
110use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
111use crate::session::vars::{self, FeatureFlag};
112use crate::{ORDINALITY_COL_NAME, normalize};
113
114#[derive(Debug)]
115pub struct PlannedRootQuery<E> {
116 pub expr: E,
117 pub desc: RelationDesc,
118 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
119 pub scope: Scope,
120}
121
122#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
131pub fn plan_root_query(
132 scx: &StatementContext,
133 mut query: Query<Aug>,
134 lifetime: QueryLifetime,
135) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
136 transform_ast::transform(scx, &mut query)?;
137 let mut qcx = QueryContext::root(scx, lifetime);
138 let PlannedQuery {
139 mut expr,
140 scope,
141 order_by,
142 limit,
143 offset,
144 project,
145 group_size_hints,
146 } = plan_query(&mut qcx, &query)?;
147
148 let mut finishing = RowSetFinishing {
149 limit,
150 offset,
151 project,
152 order_by,
153 };
154
155 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
161
162 if lifetime.is_maintained() {
163 expr.finish_maintained(&mut finishing, group_size_hints);
164 }
165
166 let typ = qcx.relation_type(&expr);
167 let typ = SqlRelationType::new(
168 finishing
169 .project
170 .iter()
171 .map(|i| typ.column_types[*i].clone())
172 .collect(),
173 );
174 let desc = RelationDesc::new(typ, scope.column_names());
175
176 Ok(PlannedRootQuery {
177 expr,
178 desc,
179 finishing,
180 scope,
181 })
182}
183
184#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
186pub fn plan_ct_query(
187 qcx: &mut QueryContext,
188 mut query: Query<Aug>,
189) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
190 transform_ast::transform(qcx.scx, &mut query)?;
191 let PlannedQuery {
192 mut expr,
193 scope,
194 order_by,
195 limit,
196 offset,
197 project,
198 group_size_hints,
199 } = plan_query(qcx, &query)?;
200
201 let mut finishing = RowSetFinishing {
202 limit,
203 offset,
204 project,
205 order_by,
206 };
207
208 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
214
215 expr.finish_maintained(&mut finishing, group_size_hints);
216
217 let typ = qcx.relation_type(&expr);
218 let typ = SqlRelationType::new(
219 finishing
220 .project
221 .iter()
222 .map(|i| typ.column_types[*i].clone())
223 .collect(),
224 );
225 let desc = RelationDesc::new(typ, scope.column_names());
226
227 Ok(PlannedRootQuery {
228 expr,
229 desc,
230 finishing,
231 scope,
232 })
233}
234
235fn try_push_projection_order_by(
246 expr: &mut HirRelationExpr,
247 project: &mut Vec<usize>,
248 order_by: &mut Vec<ColumnOrder>,
249) -> bool {
250 let mut unproject = vec![None; expr.arity()];
251 for (out_i, in_i) in project.iter().copied().enumerate() {
252 unproject[in_i] = Some(out_i);
253 }
254 if order_by
255 .iter()
256 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
257 {
258 let trivial_project = (0..project.len()).collect();
259 *expr = expr.take().project(mem::replace(project, trivial_project));
260 for ob in order_by {
261 ob.column = unproject[ob.column].unwrap();
262 }
263 true
264 } else {
265 false
266 }
267}
268
269pub fn plan_insert_query(
270 scx: &StatementContext,
271 table_name: ResolvedItemName,
272 columns: Vec<Ident>,
273 source: InsertSource<Aug>,
274 returning: Vec<SelectItem<Aug>>,
275) -> Result<
276 (
277 CatalogItemId,
278 HirRelationExpr,
279 PlannedRootQuery<Vec<HirScalarExpr>>,
280 ),
281 PlanError,
282> {
283 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
284 let table = scx.get_item_by_resolved_name(&table_name)?;
285
286 if table.item_type() != CatalogItemType::Table {
288 sql_bail!(
289 "cannot insert into {} '{}'",
290 table.item_type(),
291 table_name.full_name_str()
292 );
293 }
294 let desc = table.relation_desc().expect("table has desc");
295 let mut defaults = table
296 .writable_table_details()
297 .ok_or_else(|| {
298 sql_err!(
299 "cannot insert into non-writeable table '{}'",
300 table_name.full_name_str()
301 )
302 })?
303 .to_vec();
304
305 for default in &mut defaults {
306 transform_ast::transform(scx, default)?;
307 }
308
309 if table.id().is_system() {
310 sql_bail!(
311 "cannot insert into system table '{}'",
312 table_name.full_name_str()
313 );
314 }
315
316 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
317
318 let mut source_types = Vec::with_capacity(columns.len());
320 let mut ordering = Vec::with_capacity(columns.len());
321
322 if columns.is_empty() {
323 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
326 ordering.extend(0..desc.arity());
327 } else {
328 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
329 .iter()
330 .enumerate()
331 .map(|(idx, (name, typ))| (name, (idx, typ)))
332 .collect();
333
334 for c in &columns {
335 if let Some((idx, typ)) = column_by_name.get(c) {
336 ordering.push(*idx);
337 source_types.push(&typ.scalar_type);
338 } else {
339 sql_bail!(
340 "column {} of relation {} does not exist",
341 c.quoted(),
342 table_name.full_name_str().quoted()
343 );
344 }
345 }
346 if let Some(dup) = columns.iter().duplicates().next() {
347 sql_bail!("column {} specified more than once", dup.quoted());
348 }
349 };
350
351 let expr = match source {
353 InsertSource::Query(mut query) => {
354 transform_ast::transform(scx, &mut query)?;
355
356 match query {
357 Query {
359 body: SetExpr::Values(Values(values)),
360 ctes,
361 order_by,
362 limit: None,
363 offset: None,
364 } if ctes.is_empty() && order_by.is_empty() => {
365 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
366 plan_values_insert(&qcx, &names, &source_types, &values)?
367 }
368 _ => {
369 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
370 expr
371 }
372 }
373 }
374 InsertSource::DefaultValues => {
375 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
376 }
377 };
378
379 let expr_arity = expr.arity();
380
381 let max_columns = if columns.is_empty() {
384 desc.arity()
385 } else {
386 columns.len()
387 };
388 if expr_arity > max_columns {
389 sql_bail!("INSERT has more expressions than target columns");
390 }
391 if expr_arity < columns.len() {
393 sql_bail!("INSERT has more target columns than expressions");
394 }
395
396 source_types.truncate(expr_arity);
398 ordering.truncate(expr_arity);
399
400 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
403 sql_err!(
404 "column {} is of type {} but expression is of type {}",
405 desc.get_name(ordering[e.column]).quoted(),
406 qcx.humanize_scalar_type(&e.target_type, false),
407 qcx.humanize_scalar_type(&e.source_type, false),
408 )
409 })?;
410
411 let mut map_exprs = vec![];
413 let mut project_key = Vec::with_capacity(desc.arity());
414
415 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
417
418 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
419 for (col_idx, (col_typ, default)) in column_details {
420 if let Some(src_idx) = col_to_source.get(&col_idx) {
421 project_key.push(*src_idx);
422 } else {
423 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
424 project_key.push(expr_arity + map_exprs.len());
425 map_exprs.push(hir);
426 }
427 }
428
429 let returning = {
430 let (scope, typ) = if let ResolvedItemName::Item {
431 full_name,
432 version: _,
433 ..
434 } = table_name
435 {
436 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
437 let typ = desc.typ().clone();
438 (scope, typ)
439 } else {
440 (Scope::empty(), SqlRelationType::empty())
441 };
442 let ecx = &ExprContext {
443 qcx: &qcx,
444 name: "RETURNING clause",
445 scope: &scope,
446 relation_type: &typ,
447 allow_aggregates: false,
448 allow_subqueries: false,
449 allow_parameters: true,
450 allow_windows: false,
451 };
452 let table_func_names = BTreeMap::new();
453 let mut output_columns = vec![];
454 let mut new_exprs = vec![];
455 let mut new_type = SqlRelationType::empty();
456 for mut si in returning {
457 transform_ast::transform(scx, &mut si)?;
458 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
459 let expr = match &select_item {
460 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
461 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
462 };
463 output_columns.push(column_name);
464 let typ = ecx.column_type(&expr);
465 new_type.column_types.push(typ);
466 new_exprs.push(expr);
467 }
468 }
469 let desc = RelationDesc::new(new_type, output_columns);
470 let desc_arity = desc.arity();
471 PlannedRootQuery {
472 expr: new_exprs,
473 desc,
474 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
475 scope,
476 }
477 };
478
479 Ok((
480 table.id(),
481 expr.map(map_exprs).project(project_key),
482 returning,
483 ))
484}
485
486pub fn plan_copy_item(
497 scx: &StatementContext,
498 item_name: ResolvedItemName,
499 columns: Vec<Ident>,
500) -> Result<
501 (
502 CatalogItemId,
503 RelationDesc,
504 Vec<ColumnIndex>,
505 Option<MapFilterProject>,
506 ),
507 PlanError,
508> {
509 let item = scx.get_item_by_resolved_name(&item_name)?;
510 let fullname = scx.catalog.resolve_full_name(item.name());
511 let table_desc = match item.relation_desc() {
512 Some(desc) => desc.into_owned(),
513 None => {
514 return Err(PlanError::InvalidDependency {
515 name: fullname.to_string(),
516 item_type: item.item_type().to_string(),
517 });
518 }
519 };
520 let mut ordering = Vec::with_capacity(columns.len());
521
522 let mfp = if let Some(table_defaults) = item.writable_table_details() {
532 let mut table_defaults = table_defaults.to_vec();
533
534 for default in &mut table_defaults {
535 transform_ast::transform(scx, default)?;
536 }
537
538 let source_column_names: Vec<_> = columns
540 .iter()
541 .cloned()
542 .map(normalize::column_name)
543 .collect();
544
545 let mut default_exprs = Vec::new();
546 let mut project_keys = Vec::with_capacity(table_desc.arity());
547
548 let column_details = table_desc.iter().zip_eq(table_defaults);
551 for ((col_name, col_type), col_default) in column_details {
552 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
553 if let Some(src_idx) = maybe_src_idx {
554 project_keys.push(src_idx);
555 } else {
556 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
559 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
560 project_keys.push(source_column_names.len() + default_exprs.len());
561 default_exprs.push(mir);
562 }
563 }
564
565 let mfp = MapFilterProject::new(source_column_names.len())
566 .map(default_exprs)
567 .project(project_keys);
568 Some(mfp)
569 } else {
570 None
571 };
572
573 let source_desc = if columns.is_empty() {
575 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
576 ordering.extend(indexes);
577
578 table_desc
580 } else {
581 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
582 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
583 .iter_all()
584 .map(|(idx, name, typ)| (name, (*idx, typ)))
585 .collect();
586
587 let mut names = Vec::with_capacity(columns.len());
588 let mut source_types = Vec::with_capacity(columns.len());
589
590 for c in &columns {
591 if let Some((idx, typ)) = column_by_name.get(c) {
592 ordering.push(*idx);
593 source_types.push((*typ).clone());
594 names.push(c.clone());
595 } else {
596 sql_bail!(
597 "column {} of relation {} does not exist",
598 c.quoted(),
599 item_name.full_name_str().quoted()
600 );
601 }
602 }
603 if let Some(dup) = columns.iter().duplicates().next() {
604 sql_bail!("column {} specified more than once", dup.quoted());
605 }
606
607 RelationDesc::new(SqlRelationType::new(source_types), names)
609 };
610
611 Ok((item.id(), source_desc, ordering, mfp))
612}
613
614pub fn plan_copy_from(
618 scx: &StatementContext,
619 table_name: ResolvedItemName,
620 columns: Vec<Ident>,
621) -> Result<
622 (
623 CatalogItemId,
624 RelationDesc,
625 Vec<ColumnIndex>,
626 Option<MapFilterProject>,
627 ),
628 PlanError,
629> {
630 let table = scx.get_item_by_resolved_name(&table_name)?;
631
632 if table.item_type() != CatalogItemType::Table {
634 sql_bail!(
635 "cannot insert into {} '{}'",
636 table.item_type(),
637 table_name.full_name_str()
638 );
639 }
640
641 let _ = table.writable_table_details().ok_or_else(|| {
642 sql_err!(
643 "cannot insert into non-writeable table '{}'",
644 table_name.full_name_str()
645 )
646 })?;
647
648 if table.id().is_system() {
649 sql_bail!(
650 "cannot insert into system table '{}'",
651 table_name.full_name_str()
652 );
653 }
654 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
655
656 Ok((id, desc, ordering, mfp))
657}
658
659pub fn plan_copy_from_rows(
662 pcx: &PlanContext,
663 catalog: &dyn SessionCatalog,
664 target_id: CatalogItemId,
665 target_name: String,
666 columns: Vec<ColumnIndex>,
667 rows: Vec<mz_repr::Row>,
668) -> Result<HirRelationExpr, PlanError> {
669 let scx = StatementContext::new(Some(pcx), catalog);
670
671 let table = catalog
673 .try_get_item(&target_id)
674 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
675 .at_version(RelationVersionSelector::Latest);
676
677 let mut defaults = table
678 .writable_table_details()
679 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
680 .to_vec();
681
682 for default in &mut defaults {
683 transform_ast::transform(&scx, default)?;
684 }
685
686 let desc = table.relation_desc().expect("table has desc");
687 let column_types = columns
688 .iter()
689 .map(|x| desc.get_type(x).clone())
690 .map(|mut x| {
691 x.nullable = true;
694 x
695 })
696 .collect();
697 let typ = SqlRelationType::new(column_types);
698 let expr = HirRelationExpr::Constant {
699 rows,
700 typ: typ.clone(),
701 };
702
703 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
709 if columns == default {
710 return Ok(expr);
711 }
712
713 let mut map_exprs = vec![];
715 let mut project_key = Vec::with_capacity(desc.arity());
716
717 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
719
720 let column_details = desc.iter_all().zip_eq(defaults);
721 for ((col_idx, _col_name, col_typ), default) in column_details {
722 if let Some(src_idx) = col_to_source.get(&col_idx) {
723 project_key.push(*src_idx);
724 } else {
725 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
726 project_key.push(typ.arity() + map_exprs.len());
727 map_exprs.push(hir);
728 }
729 }
730
731 Ok(expr.map(map_exprs).project(project_key))
732}
733
734pub struct ReadThenWritePlan {
736 pub id: CatalogItemId,
737 pub selection: HirRelationExpr,
742 pub assignments: BTreeMap<usize, HirScalarExpr>,
744 pub finishing: RowSetFinishing,
745}
746
747pub fn plan_delete_query(
748 scx: &StatementContext,
749 mut delete_stmt: DeleteStatement<Aug>,
750) -> Result<ReadThenWritePlan, PlanError> {
751 transform_ast::transform(scx, &mut delete_stmt)?;
752
753 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
754 plan_mutation_query_inner(
755 qcx,
756 delete_stmt.table_name,
757 delete_stmt.alias,
758 delete_stmt.using,
759 vec![],
760 delete_stmt.selection,
761 )
762}
763
764pub fn plan_update_query(
765 scx: &StatementContext,
766 mut update_stmt: UpdateStatement<Aug>,
767) -> Result<ReadThenWritePlan, PlanError> {
768 transform_ast::transform(scx, &mut update_stmt)?;
769
770 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
771
772 plan_mutation_query_inner(
773 qcx,
774 update_stmt.table_name,
775 update_stmt.alias,
776 vec![],
777 update_stmt.assignments,
778 update_stmt.selection,
779 )
780}
781
782pub fn plan_mutation_query_inner(
783 qcx: QueryContext,
784 table_name: ResolvedItemName,
785 alias: Option<TableAlias>,
786 using: Vec<TableWithJoins<Aug>>,
787 assignments: Vec<Assignment<Aug>>,
788 selection: Option<Expr<Aug>>,
789) -> Result<ReadThenWritePlan, PlanError> {
790 let (id, version) = match table_name {
792 ResolvedItemName::Item { id, version, .. } => (id, version),
793 _ => sql_bail!("cannot mutate non-user table"),
794 };
795
796 let item = qcx.scx.get_item(&id).at_version(version);
798 if item.item_type() != CatalogItemType::Table {
799 sql_bail!(
800 "cannot mutate {} '{}'",
801 item.item_type(),
802 table_name.full_name_str()
803 );
804 }
805 let _ = item.writable_table_details().ok_or_else(|| {
806 sql_err!(
807 "cannot mutate non-writeable table '{}'",
808 table_name.full_name_str()
809 )
810 })?;
811 if id.is_system() {
812 sql_bail!(
813 "cannot mutate system table '{}'",
814 table_name.full_name_str()
815 );
816 }
817
818 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
820 let scope = plan_table_alias(scope, alias.as_ref())?;
821 let desc = item.relation_desc().expect("table has desc");
822 let relation_type = qcx.relation_type(&get);
823
824 if using.is_empty() {
825 if let Some(expr) = selection {
826 let ecx = &ExprContext {
827 qcx: &qcx,
828 name: "WHERE clause",
829 scope: &scope,
830 relation_type: &relation_type,
831 allow_aggregates: false,
832 allow_subqueries: true,
833 allow_parameters: true,
834 allow_windows: false,
835 };
836 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
837 get = get.filter(vec![expr]);
838 }
839 } else {
840 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
841 }
842
843 let mut sets = BTreeMap::new();
844 for Assignment { id, value } in assignments {
845 let name = normalize::column_name(id);
847 match desc.get_by_name(&name) {
848 Some((idx, typ)) => {
849 let ecx = &ExprContext {
850 qcx: &qcx,
851 name: "SET clause",
852 scope: &scope,
853 relation_type: &relation_type,
854 allow_aggregates: false,
855 allow_subqueries: false,
856 allow_parameters: true,
857 allow_windows: false,
858 };
859 let expr = plan_expr(ecx, &value)?.cast_to(
860 ecx,
861 CastContext::Assignment,
862 &typ.scalar_type,
863 )?;
864
865 if sets.insert(idx, expr).is_some() {
866 sql_bail!("column {} set twice", name)
867 }
868 }
869 None => sql_bail!("unknown column {}", name),
870 };
871 }
872
873 let finishing = RowSetFinishing {
874 order_by: vec![],
875 limit: None,
876 offset: 0,
877 project: (0..desc.arity()).collect(),
878 };
879
880 Ok(ReadThenWritePlan {
881 id,
882 selection: get,
883 finishing,
884 assignments: sets,
885 })
886}
887
888fn handle_mutation_using_clause(
900 qcx: &QueryContext,
901 selection: Option<Expr<Aug>>,
902 using: Vec<TableWithJoins<Aug>>,
903 get: HirRelationExpr,
904 outer_scope: Scope,
905) -> Result<HirRelationExpr, PlanError> {
906 let (mut using_rel_expr, using_scope) =
910 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
911 let (left, left_scope) = l;
912 plan_join(
913 qcx,
914 left,
915 left_scope,
916 &Join {
917 relation: TableFactor::NestedJoin {
918 join: Box::new(twj),
919 alias: None,
920 },
921 join_operator: JoinOperator::CrossJoin,
922 },
923 )
924 })?;
925
926 if let Some(expr) = selection {
927 let on = HirScalarExpr::literal_true();
933 let joined = using_rel_expr
934 .clone()
935 .join(get.clone(), on, JoinKind::Inner);
936 let joined_scope = using_scope.product(outer_scope)?;
937 let joined_relation_type = qcx.relation_type(&joined);
938
939 let ecx = &ExprContext {
940 qcx,
941 name: "WHERE clause",
942 scope: &joined_scope,
943 relation_type: &joined_relation_type,
944 allow_aggregates: false,
945 allow_subqueries: true,
946 allow_parameters: true,
947 allow_windows: false,
948 };
949
950 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
952
953 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
957 use mz_expr::visit::Visit;
959 expr.visit_mut_post(&mut |e| {
960 if let HirScalarExpr::Column(c, _name) = e {
961 if c.column >= using_rel_arity {
962 c.level += 1;
963 c.column -= using_rel_arity;
964 };
965 }
966 })?;
967
968 using_rel_expr = using_rel_expr.filter(vec![expr]);
972 } else {
973 let _joined_scope = using_scope.product(outer_scope)?;
976 }
977 Ok(get.filter(vec![using_rel_expr.exists()]))
988}
989
990#[derive(Debug)]
991pub(crate) struct CastRelationError {
992 pub(crate) column: usize,
993 pub(crate) source_type: SqlScalarType,
994 pub(crate) target_type: SqlScalarType,
995}
996
997pub(crate) fn cast_relation<'a, I>(
1001 qcx: &QueryContext,
1002 ccx: CastContext,
1003 expr: HirRelationExpr,
1004 target_types: I,
1005) -> Result<HirRelationExpr, CastRelationError>
1006where
1007 I: IntoIterator<Item = &'a SqlScalarType>,
1008{
1009 let ecx = &ExprContext {
1010 qcx,
1011 name: "values",
1012 scope: &Scope::empty(),
1013 relation_type: &qcx.relation_type(&expr),
1014 allow_aggregates: false,
1015 allow_subqueries: true,
1016 allow_parameters: true,
1017 allow_windows: false,
1018 };
1019 let mut map_exprs = vec![];
1020 let mut project_key = vec![];
1021 for (i, target_typ) in target_types.into_iter().enumerate() {
1022 let expr = HirScalarExpr::column(i);
1023 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1027 Ok(cast_expr) => {
1028 if expr == cast_expr {
1029 project_key.push(i);
1031 } else {
1032 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1034 map_exprs.push(cast_expr);
1035 }
1036 }
1037 Err(_) => {
1038 return Err(CastRelationError {
1039 column: i,
1040 source_type: ecx.scalar_type(&expr),
1041 target_type: target_typ.clone(),
1042 });
1043 }
1044 }
1045 }
1046 Ok(expr.map(map_exprs).project(project_key))
1047}
1048
1049pub fn plan_as_of(
1052 scx: &StatementContext,
1053 as_of: Option<AsOf<Aug>>,
1054) -> Result<QueryWhen, PlanError> {
1055 match as_of {
1056 None => Ok(QueryWhen::Immediately),
1057 Some(as_of) => match as_of {
1058 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1059 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1060 },
1061 }
1062}
1063
1064pub fn plan_as_of_or_up_to(
1074 scx: &StatementContext,
1075 mut expr: Expr<Aug>,
1076) -> Result<mz_repr::Timestamp, PlanError> {
1077 let scope = Scope::empty();
1078 let desc = RelationDesc::empty();
1079 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1082 transform_ast::transform(scx, &mut expr)?;
1083 let ecx = &ExprContext {
1084 qcx: &qcx,
1085 name: "AS OF or UP TO",
1086 scope: &scope,
1087 relation_type: desc.typ(),
1088 allow_aggregates: false,
1089 allow_subqueries: false,
1090 allow_parameters: false,
1091 allow_windows: false,
1092 };
1093 let hir = plan_expr(ecx, &expr)?.cast_to(
1094 ecx,
1095 CastContext::Assignment,
1096 &SqlScalarType::MzTimestamp,
1097 )?;
1098 if hir.contains_unmaterializable() {
1099 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1100 }
1101 let timestamp = hir
1108 .into_literal_mz_timestamp()
1109 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1110 Ok(timestamp)
1111}
1112
1113pub fn plan_secret_as(
1115 scx: &StatementContext,
1116 mut expr: Expr<Aug>,
1117) -> Result<MirScalarExpr, PlanError> {
1118 let scope = Scope::empty();
1119 let desc = RelationDesc::empty();
1120 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1121
1122 transform_ast::transform(scx, &mut expr)?;
1123
1124 let ecx = &ExprContext {
1125 qcx: &qcx,
1126 name: "AS",
1127 scope: &scope,
1128 relation_type: desc.typ(),
1129 allow_aggregates: false,
1130 allow_subqueries: false,
1131 allow_parameters: false,
1132 allow_windows: false,
1133 };
1134 let expr = plan_expr(ecx, &expr)?
1135 .type_as(ecx, &SqlScalarType::Bytes)?
1136 .lower_uncorrelated(scx.catalog.system_vars())?;
1137 Ok(expr)
1138}
1139
1140pub fn plan_webhook_validate_using(
1142 scx: &StatementContext,
1143 validate_using: CreateWebhookSourceCheck<Aug>,
1144) -> Result<WebhookValidation, PlanError> {
1145 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1146
1147 let CreateWebhookSourceCheck {
1148 options,
1149 using: mut expr,
1150 } = validate_using;
1151
1152 let mut column_typs = vec![];
1153 let mut column_names = vec![];
1154
1155 let (bodies, headers, secrets) = options
1156 .map(|o| (o.bodies, o.headers, o.secrets))
1157 .unwrap_or_default();
1158
1159 let mut body_tuples = vec![];
1161 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1162 let scalar_type = use_bytes
1163 .then_some(SqlScalarType::Bytes)
1164 .unwrap_or(SqlScalarType::String);
1165 let name = alias
1166 .map(|a| a.into_string())
1167 .unwrap_or_else(|| "body".to_string());
1168
1169 column_typs.push(SqlColumnType {
1170 scalar_type,
1171 nullable: false,
1172 });
1173 column_names.push(name);
1174
1175 let column_idx = column_typs.len() - 1;
1177 assert_eq!(
1179 column_idx,
1180 column_names.len() - 1,
1181 "body column names and types don't match"
1182 );
1183 body_tuples.push((column_idx, use_bytes));
1184 }
1185
1186 let mut header_tuples = vec![];
1188
1189 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1190 let value_type = use_bytes
1191 .then_some(SqlScalarType::Bytes)
1192 .unwrap_or(SqlScalarType::String);
1193 let name = alias
1194 .map(|a| a.into_string())
1195 .unwrap_or_else(|| "headers".to_string());
1196
1197 column_typs.push(SqlColumnType {
1198 scalar_type: SqlScalarType::Map {
1199 value_type: Box::new(value_type),
1200 custom_id: None,
1201 },
1202 nullable: false,
1203 });
1204 column_names.push(name);
1205
1206 let column_idx = column_typs.len() - 1;
1208 assert_eq!(
1210 column_idx,
1211 column_names.len() - 1,
1212 "header column names and types don't match"
1213 );
1214 header_tuples.push((column_idx, use_bytes));
1215 }
1216
1217 let mut validation_secrets = vec![];
1219
1220 for CreateWebhookSourceSecret {
1221 secret,
1222 alias,
1223 use_bytes,
1224 } in secrets
1225 {
1226 let scalar_type = use_bytes
1228 .then_some(SqlScalarType::Bytes)
1229 .unwrap_or(SqlScalarType::String);
1230
1231 column_typs.push(SqlColumnType {
1232 scalar_type,
1233 nullable: false,
1234 });
1235 let ResolvedItemName::Item {
1236 id,
1237 full_name: FullItemName { item, .. },
1238 ..
1239 } = secret
1240 else {
1241 return Err(PlanError::InvalidSecret(Box::new(secret)));
1242 };
1243
1244 let name = if let Some(alias) = alias {
1246 alias.into_string()
1247 } else {
1248 item
1249 };
1250 column_names.push(name);
1251
1252 let column_idx = column_typs.len() - 1;
1255 assert_eq!(
1257 column_idx,
1258 column_names.len() - 1,
1259 "column names and types don't match"
1260 );
1261
1262 validation_secrets.push(WebhookValidationSecret {
1263 id,
1264 column_idx,
1265 use_bytes,
1266 });
1267 }
1268
1269 let relation_typ = SqlRelationType::new(column_typs);
1270 let desc = RelationDesc::new(relation_typ, column_names.clone());
1271 let scope = Scope::from_source(None, column_names);
1272
1273 transform_ast::transform(scx, &mut expr)?;
1274
1275 let ecx = &ExprContext {
1276 qcx: &qcx,
1277 name: "CHECK",
1278 scope: &scope,
1279 relation_type: desc.typ(),
1280 allow_aggregates: false,
1281 allow_subqueries: false,
1282 allow_parameters: false,
1283 allow_windows: false,
1284 };
1285 let expr = plan_expr(ecx, &expr)?
1286 .type_as(ecx, &SqlScalarType::Bool)?
1287 .lower_uncorrelated(scx.catalog.system_vars())?;
1288 let validation = WebhookValidation {
1289 expression: expr,
1290 relation_desc: desc,
1291 bodies: body_tuples,
1292 headers: header_tuples,
1293 secrets: validation_secrets,
1294 };
1295 Ok(validation)
1296}
1297
1298pub fn plan_default_expr(
1299 scx: &StatementContext,
1300 expr: &Expr<Aug>,
1301 target_ty: &SqlScalarType,
1302) -> Result<HirScalarExpr, PlanError> {
1303 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1304 let ecx = &ExprContext {
1305 qcx: &qcx,
1306 name: "DEFAULT expression",
1307 scope: &Scope::empty(),
1308 relation_type: &SqlRelationType::empty(),
1309 allow_aggregates: false,
1310 allow_subqueries: false,
1311 allow_parameters: false,
1312 allow_windows: false,
1313 };
1314 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1315 Ok(hir)
1316}
1317
1318pub fn plan_params<'a>(
1319 scx: &'a StatementContext,
1320 params: Vec<Expr<Aug>>,
1321 desc: &StatementDesc,
1322) -> Result<Params, PlanError> {
1323 if params.len() != desc.param_types.len() {
1324 sql_bail!(
1325 "expected {} params, got {}",
1326 desc.param_types.len(),
1327 params.len()
1328 );
1329 }
1330
1331 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1332
1333 let mut datums = Row::default();
1334 let mut packer = datums.packer();
1335 let mut actual_types = Vec::new();
1336 let temp_storage = &RowArena::new();
1337 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1338 transform_ast::transform(scx, &mut expr)?;
1339
1340 let ecx = execute_expr_context(&qcx);
1341 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1342 let actual_ty = ecx.scalar_type(&ex);
1343 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1344 return Err(PlanError::WrongParameterType(
1345 i + 1,
1346 ecx.humanize_scalar_type(expected_ty, false),
1347 ecx.humanize_scalar_type(&actual_ty, false),
1348 ));
1349 }
1350 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1351 let evaled = ex.eval(&[], temp_storage)?;
1352 packer.push(evaled);
1353 actual_types.push(actual_ty);
1354 }
1355 Ok(Params {
1356 datums,
1357 execute_types: actual_types,
1358 expected_types: desc.param_types.clone(),
1359 })
1360}
1361
1362static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1363static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1364
1365pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1367 ExprContext {
1368 qcx,
1369 name: "EXECUTE",
1370 scope: &EXECUTE_CONTEXT_SCOPE,
1371 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1372 allow_aggregates: false,
1373 allow_subqueries: false,
1374 allow_parameters: false,
1375 allow_windows: false,
1376 }
1377}
1378
1379pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1384 LazyLock::new(|| CastContext::Assignment);
1385
1386pub fn plan_index_exprs<'a>(
1387 scx: &'a StatementContext,
1388 on_desc: &RelationDesc,
1389 exprs: Vec<Expr<Aug>>,
1390) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1391 let scope = Scope::from_source(None, on_desc.iter_names());
1392 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1393
1394 let ecx = &ExprContext {
1395 qcx: &qcx,
1396 name: "CREATE INDEX",
1397 scope: &scope,
1398 relation_type: on_desc.typ(),
1399 allow_aggregates: false,
1400 allow_subqueries: false,
1401 allow_parameters: false,
1402 allow_windows: false,
1403 };
1404 let mut out = vec![];
1405 for mut expr in exprs {
1406 transform_ast::transform(scx, &mut expr)?;
1407 let expr = plan_expr_or_col_index(ecx, &expr)?;
1408 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1409 let repr_col_types: Vec<ReprColumnType> = on_desc
1410 .typ()
1411 .column_types
1412 .iter()
1413 .map(ReprColumnType::from)
1414 .collect();
1415 expr.reduce(&repr_col_types);
1416 out.push(expr);
1417 }
1418 Ok(out)
1419}
1420
1421fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1422 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1423 Some(column) => Ok(HirScalarExpr::column(column)),
1424 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1425 }
1426}
1427
1428fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1429 match e {
1430 Expr::Value(Value::Number(n)) => {
1431 let n = n.parse::<usize>().map_err(|e| {
1432 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1433 })?;
1434 if n < 1 || n > max {
1435 sql_bail!(
1436 "column reference {} in {} is out of range (1 - {})",
1437 n,
1438 name,
1439 max
1440 );
1441 }
1442 Ok(Some(n - 1))
1443 }
1444 _ => Ok(None),
1445 }
1446}
1447
1448struct PlannedQuery {
1449 expr: HirRelationExpr,
1450 scope: Scope,
1451 order_by: Vec<ColumnOrder>,
1452 limit: Option<HirScalarExpr>,
1453 offset: HirScalarExpr,
1459 project: Vec<usize>,
1460 group_size_hints: GroupSizeHints,
1461}
1462
1463fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1464 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1465}
1466
1467fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1468 let cte_bindings = plan_ctes(qcx, q)?;
1471
1472 let limit = match &q.limit {
1473 None => None,
1474 Some(Limit {
1475 quantity,
1476 with_ties: false,
1477 }) => {
1478 let ecx = &ExprContext {
1479 qcx,
1480 name: "LIMIT",
1481 scope: &Scope::empty(),
1482 relation_type: &SqlRelationType::empty(),
1483 allow_aggregates: false,
1484 allow_subqueries: true,
1485 allow_parameters: true,
1486 allow_windows: false,
1487 };
1488 let limit = plan_expr(ecx, quantity)?;
1489 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491 let limit = if limit.is_constant() {
1492 let arena = RowArena::new();
1493 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1494
1495 match limit.eval(&[], &arena)? {
1499 d @ Datum::Int64(v) if v >= 0 => {
1500 HirScalarExpr::literal(d, SqlScalarType::Int64)
1501 }
1502 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1503 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1504 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1505 }
1506 } else {
1507 qcx.scx
1509 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1510 limit
1511 };
1512
1513 Some(limit)
1514 }
1515 Some(Limit {
1516 quantity: _,
1517 with_ties: true,
1518 }) => bail_unsupported!("FETCH ... WITH TIES"),
1519 };
1520
1521 let offset = match &q.offset {
1522 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1523 Some(offset) => {
1524 let ecx = &ExprContext {
1525 qcx,
1526 name: "OFFSET",
1527 scope: &Scope::empty(),
1528 relation_type: &SqlRelationType::empty(),
1529 allow_aggregates: false,
1530 allow_subqueries: false,
1531 allow_parameters: true,
1532 allow_windows: false,
1533 };
1534 let offset = plan_expr(ecx, offset)?;
1535 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1536
1537 let offset = if offset.is_constant() {
1538 let offset_value = offset_into_value(offset)?;
1540 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1541 } else {
1542 if !offset.contains_parameters() {
1546 return Err(PlanError::InvalidOffset(format!(
1547 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1548 offset
1549 )));
1550 }
1551 offset
1552 };
1553 offset
1554 }
1555 };
1556
1557 let mut planned_query = match &q.body {
1558 SetExpr::Select(s) => {
1559 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1561 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1562
1563 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1564 PlannedQuery {
1565 expr: plan.expr,
1566 scope: plan.scope,
1567 order_by: plan.order_by,
1568 project: plan.project,
1569 limit,
1570 offset,
1571 group_size_hints,
1572 }
1573 }
1574 _ => {
1575 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1576 let ecx = &ExprContext {
1577 qcx,
1578 name: "ORDER BY clause of a set expression",
1579 scope: &scope,
1580 relation_type: &qcx.relation_type(&expr),
1581 allow_aggregates: false,
1582 allow_subqueries: true,
1583 allow_parameters: true,
1584 allow_windows: false,
1585 };
1586 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1587 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1588 let project = (0..ecx.relation_type.arity()).collect();
1589 PlannedQuery {
1590 expr: expr.map(map_exprs),
1591 scope,
1592 order_by,
1593 limit,
1594 project,
1595 offset,
1596 group_size_hints: GroupSizeHints::default(),
1597 }
1598 }
1599 };
1600
1601 match &q.ctes {
1603 CteBlock::Simple(_) => {
1604 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1605 if let Some(cte) = qcx.ctes.remove(&id) {
1606 planned_query.expr = HirRelationExpr::Let {
1607 name: cte.name,
1608 id: id.clone(),
1609 value: Box::new(value),
1610 body: Box::new(planned_query.expr),
1611 };
1612 }
1613 if let Some(shadowed_val) = shadowed_val {
1614 qcx.ctes.insert(id, shadowed_val);
1615 }
1616 }
1617 }
1618 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1619 let MutRecBlockOptionExtracted {
1620 recursion_limit,
1621 return_at_recursion_limit,
1622 error_at_recursion_limit,
1623 seen: _,
1624 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1625 let limit = match (
1626 recursion_limit,
1627 return_at_recursion_limit,
1628 error_at_recursion_limit,
1629 ) {
1630 (None, None, None) => None,
1631 (Some(max_iters), None, None) => {
1632 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1633 }
1634 (None, Some(max_iters), None) => Some((max_iters, true)),
1635 (None, None, Some(max_iters)) => Some((max_iters, false)),
1636 _ => {
1637 return Err(InvalidWmrRecursionLimit(
1638 "More than one recursion limit given. \
1639 Please give at most one of RECURSION LIMIT, \
1640 ERROR AT RECURSION LIMIT, \
1641 RETURN AT RECURSION LIMIT."
1642 .to_owned(),
1643 ));
1644 }
1645 }
1646 .try_map(|(max_iters, return_at_limit)| {
1647 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1648 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1649 "Recursion limit has to be greater than 0.".to_owned(),
1650 ))?,
1651 return_at_limit: *return_at_limit,
1652 })
1653 })?;
1654
1655 let mut bindings = Vec::new();
1656 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1657 if let Some(cte) = qcx.ctes.remove(&id) {
1658 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1659 }
1660 if let Some(shadowed_val) = shadowed_val {
1661 qcx.ctes.insert(id, shadowed_val);
1662 }
1663 }
1664 if !bindings.is_empty() {
1665 planned_query.expr = HirRelationExpr::LetRec {
1666 limit,
1667 bindings,
1668 body: Box::new(planned_query.expr),
1669 }
1670 }
1671 }
1672 }
1673
1674 Ok(planned_query)
1675}
1676
1677pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1679 let offset = offset
1680 .try_into_literal_int64()
1681 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1682 if offset < 0 {
1683 return Err(PlanError::InvalidOffset(format!(
1684 "must not be negative, got {}",
1685 offset
1686 )));
1687 }
1688 Ok(offset)
1689}
1690
1691generate_extracted_config!(
1692 MutRecBlockOption,
1693 (RecursionLimit, u64),
1694 (ReturnAtRecursionLimit, u64),
1695 (ErrorAtRecursionLimit, u64)
1696);
1697
1698pub fn plan_ctes(
1703 qcx: &mut QueryContext,
1704 q: &Query<Aug>,
1705) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1706 let mut result = Vec::new();
1708 let mut shadowed_descs = BTreeMap::new();
1711
1712 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1714 sql_bail!(
1715 "WITH query name {} specified more than once",
1716 normalize::ident_ref(ident).quoted()
1717 )
1718 }
1719
1720 match &q.ctes {
1721 CteBlock::Simple(ctes) => {
1722 for cte in ctes.iter() {
1724 let cte_name = normalize::ident(cte.alias.name.clone());
1725 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1726 let typ = qcx.relation_type(&val);
1727 let mut desc = RelationDesc::new(typ, scope.column_names());
1728 plan_utils::maybe_rename_columns(
1729 format!("CTE {}", cte.alias.name),
1730 &mut desc,
1731 &cte.alias.columns,
1732 )?;
1733 let shadowed = qcx.ctes.insert(
1735 cte.id,
1736 CteDesc {
1737 name: cte_name,
1738 desc,
1739 },
1740 );
1741
1742 result.push((cte.id, val, shadowed));
1743 }
1744 }
1745 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1746 for cte in ctes.iter() {
1748 let cte_name = normalize::ident(cte.name.clone());
1749 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1750 for column in cte.columns.iter() {
1751 desc_columns.push((
1752 normalize::column_name(column.name.clone()),
1753 SqlColumnType {
1754 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1755 nullable: true,
1756 },
1757 ));
1758 }
1759 let desc = RelationDesc::from_names_and_types(desc_columns);
1760 let shadowed = qcx.ctes.insert(
1761 cte.id,
1762 CteDesc {
1763 name: cte_name,
1764 desc,
1765 },
1766 );
1767 if let Some(shadowed) = shadowed {
1769 shadowed_descs.insert(cte.id, shadowed);
1770 }
1771 }
1772
1773 for cte in ctes.iter() {
1775 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1776
1777 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1778
1779 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1780 sql_bail!(
1783 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1784 );
1785 }
1786
1787 if !proposed_typ.keys.is_empty() {
1788 sql_bail!("[internal error]: WMR CTEs do not support keys");
1791 }
1792
1793 let derived_typ = qcx.relation_type(&val);
1795
1796 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1797 let cte_name = normalize::ident(cte.name.clone());
1798 let proposed_typ = proposed_typ
1799 .column_types
1800 .iter()
1801 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1802 .collect::<Vec<_>>();
1803 let inferred_typ = derived_typ
1804 .column_types
1805 .iter()
1806 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1807 .collect::<Vec<_>>();
1808 Err(PlanError::RecursiveTypeMismatch(
1809 cte_name,
1810 proposed_typ,
1811 inferred_typ,
1812 ))
1813 };
1814
1815 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1816 return type_err(proposed_typ, derived_typ);
1817 }
1818
1819 let val = match cast_relation(
1821 qcx,
1822 CastContext::Assignment,
1827 val,
1828 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1829 ) {
1830 Ok(val) => val,
1831 Err(_) => return type_err(proposed_typ, derived_typ),
1832 };
1833
1834 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1835 }
1836 }
1837 }
1838
1839 Ok(result)
1840}
1841
1842pub fn plan_nested_query(
1843 qcx: &mut QueryContext,
1844 q: &Query<Aug>,
1845) -> Result<(HirRelationExpr, Scope), PlanError> {
1846 let PlannedQuery {
1847 mut expr,
1848 scope,
1849 order_by,
1850 limit,
1851 offset,
1852 project,
1853 group_size_hints,
1854 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1855 if limit.is_some()
1856 || !offset
1857 .clone()
1858 .try_into_literal_int64()
1859 .is_ok_and(|offset| offset == 0)
1860 {
1861 expr = HirRelationExpr::top_k(
1862 expr,
1863 vec![],
1864 order_by,
1865 limit,
1866 offset,
1867 group_size_hints.limit_input_group_size,
1868 );
1869 }
1870 Ok((expr.project(project), scope))
1871}
1872
1873fn plan_set_expr(
1874 qcx: &mut QueryContext,
1875 q: &SetExpr<Aug>,
1876) -> Result<(HirRelationExpr, Scope), PlanError> {
1877 match q {
1878 SetExpr::Select(select) => {
1879 let order_by_exprs = Vec::new();
1880 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1881 assert!(plan.order_by.is_empty());
1884 Ok((plan.expr.project(plan.project), plan.scope))
1885 }
1886 SetExpr::SetOperation {
1887 op,
1888 all,
1889 left,
1890 right,
1891 } => {
1892 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1894 let (right_expr, right_scope) =
1895 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1896
1897 let left_type = qcx.relation_type(&left_expr);
1899 let right_type = qcx.relation_type(&right_expr);
1900 if left_type.arity() != right_type.arity() {
1901 sql_bail!(
1902 "each {} query must have the same number of columns: {} vs {}",
1903 op,
1904 left_type.arity(),
1905 right_type.arity(),
1906 );
1907 }
1908
1909 let left_ecx = &ExprContext {
1914 qcx,
1915 name: &op.to_string(),
1916 scope: &left_scope,
1917 relation_type: &left_type,
1918 allow_aggregates: false,
1919 allow_subqueries: false,
1920 allow_parameters: false,
1921 allow_windows: false,
1922 };
1923 let right_ecx = &ExprContext {
1924 qcx,
1925 name: &op.to_string(),
1926 scope: &right_scope,
1927 relation_type: &right_type,
1928 allow_aggregates: false,
1929 allow_subqueries: false,
1930 allow_parameters: false,
1931 allow_windows: false,
1932 };
1933 let mut left_casts = vec![];
1934 let mut right_casts = vec![];
1935 for (i, (left_type, right_type)) in left_type
1936 .column_types
1937 .iter()
1938 .zip_eq(right_type.column_types.iter())
1939 .enumerate()
1940 {
1941 let types = &[
1942 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1943 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1944 ];
1945 let target =
1946 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1947 match typeconv::plan_cast(
1948 left_ecx,
1949 CastContext::Implicit,
1950 HirScalarExpr::column(i),
1951 &target,
1952 ) {
1953 Ok(expr) => left_casts.push(expr),
1954 Err(_) => sql_bail!(
1955 "{} types {} and {} cannot be matched",
1956 op,
1957 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1958 qcx.humanize_scalar_type(&target, false),
1959 ),
1960 }
1961 match typeconv::plan_cast(
1962 right_ecx,
1963 CastContext::Implicit,
1964 HirScalarExpr::column(i),
1965 &target,
1966 ) {
1967 Ok(expr) => right_casts.push(expr),
1968 Err(_) => sql_bail!(
1969 "{} types {} and {} cannot be matched",
1970 op,
1971 qcx.humanize_scalar_type(&target, false),
1972 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1973 ),
1974 }
1975 }
1976 let lhs = if left_casts
1977 .iter()
1978 .enumerate()
1979 .any(|(i, e)| e != &HirScalarExpr::column(i))
1980 {
1981 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1982 left_expr.map(left_casts).project(project_key)
1983 } else {
1984 left_expr
1985 };
1986 let rhs = if right_casts
1987 .iter()
1988 .enumerate()
1989 .any(|(i, e)| e != &HirScalarExpr::column(i))
1990 {
1991 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1992 right_expr.map(right_casts).project(project_key)
1993 } else {
1994 right_expr
1995 };
1996
1997 let relation_expr = match op {
1998 SetOperator::Union => {
1999 if *all {
2000 lhs.union(rhs)
2001 } else {
2002 lhs.union(rhs).distinct()
2003 }
2004 }
2005 SetOperator::Except => Hir::except(all, lhs, rhs),
2006 SetOperator::Intersect => {
2007 let left_clone = lhs.clone();
2013 if *all {
2014 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2015 } else {
2016 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2017 .distinct()
2018 }
2019 }
2020 };
2021 let scope = Scope::from_source(
2022 None,
2023 left_scope.column_names(),
2025 );
2026
2027 Ok((relation_expr, scope))
2028 }
2029 SetExpr::Values(Values(values)) => plan_values(qcx, values),
2030 SetExpr::Table(name) => {
2031 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2032 Ok((expr, scope))
2033 }
2034 SetExpr::Query(query) => {
2035 let (expr, scope) = plan_nested_query(qcx, query)?;
2036 Ok((expr, scope))
2037 }
2038 SetExpr::Show(stmt) => {
2039 if !qcx.lifetime.allow_show() {
2053 return Err(PlanError::ShowCommandInView);
2054 }
2055
2056 fn to_hirscope(
2059 plan: ShowCreatePlan,
2060 desc: StatementDesc,
2061 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2062 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2063 let desc = desc.relation_desc.expect("must exist");
2064 let scope = Scope::from_source(None, desc.iter_names());
2065 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2066 Ok((expr, scope))
2067 }
2068
2069 match stmt.clone() {
2070 ShowStatement::ShowColumns(stmt) => {
2071 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2072 }
2073 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2074 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2075 show::describe_show_create_connection(qcx.scx, stmt)?,
2076 ),
2077 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2078 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2079 show::describe_show_create_cluster(qcx.scx, stmt)?,
2080 ),
2081 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2082 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2083 show::describe_show_create_index(qcx.scx, stmt)?,
2084 ),
2085 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2086 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2087 show::describe_show_create_sink(qcx.scx, stmt)?,
2088 ),
2089 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2090 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2091 show::describe_show_create_source(qcx.scx, stmt)?,
2092 ),
2093 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2094 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2095 show::describe_show_create_table(qcx.scx, stmt)?,
2096 ),
2097 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2098 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2099 show::describe_show_create_view(qcx.scx, stmt)?,
2100 ),
2101 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2102 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2103 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2104 ),
2105 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2106 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2107 show::describe_show_create_type(qcx.scx, stmt)?,
2108 ),
2109 ShowStatement::ShowObjects(stmt) => {
2110 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2111 }
2112 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2113 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2114 }
2115 }
2116 }
2117}
2118
2119fn plan_values(
2121 qcx: &QueryContext,
2122 values: &[Vec<Expr<Aug>>],
2123) -> Result<(HirRelationExpr, Scope), PlanError> {
2124 assert!(!values.is_empty());
2125
2126 let ecx = &ExprContext {
2127 qcx,
2128 name: "VALUES",
2129 scope: &Scope::empty(),
2130 relation_type: &SqlRelationType::empty(),
2131 allow_aggregates: false,
2132 allow_subqueries: true,
2133 allow_parameters: true,
2134 allow_windows: false,
2135 };
2136
2137 let ncols = values[0].len();
2138 let nrows = values.len();
2139
2140 let mut cols = vec![vec![]; ncols];
2143 for row in values {
2144 if row.len() != ncols {
2145 sql_bail!(
2146 "VALUES expression has varying number of columns: {} vs {}",
2147 row.len(),
2148 ncols
2149 );
2150 }
2151 for (i, v) in row.iter().enumerate() {
2152 cols[i].push(v);
2153 }
2154 }
2155
2156 let mut col_iters = Vec::with_capacity(ncols);
2158 let mut col_types = Vec::with_capacity(ncols);
2159 for col in &cols {
2160 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2161 let mut col_type = ecx.column_type(&col[0]);
2162 for val in &col[1..] {
2163 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2165 col_types.push(col_type);
2166 col_iters.push(col.into_iter());
2167 }
2168
2169 let mut exprs = vec![];
2171 for _ in 0..nrows {
2172 for i in 0..ncols {
2173 exprs.push(col_iters[i].next().unwrap());
2174 }
2175 }
2176 let out = HirRelationExpr::CallTable {
2177 func: TableFunc::Wrap {
2178 width: ncols,
2179 types: col_types,
2180 },
2181 exprs,
2182 };
2183
2184 let mut scope = Scope::empty();
2186 for i in 0..ncols {
2187 let name = format!("column{}", i + 1);
2188 scope.items.push(ScopeItem::from_column_name(name));
2189 }
2190
2191 Ok((out, scope))
2192}
2193
2194fn plan_values_insert(
2204 qcx: &QueryContext,
2205 target_names: &[&ColumnName],
2206 target_types: &[&SqlScalarType],
2207 values: &[Vec<Expr<Aug>>],
2208) -> Result<HirRelationExpr, PlanError> {
2209 assert!(!values.is_empty());
2210
2211 if !values.iter().map(|row| row.len()).all_equal() {
2212 sql_bail!("VALUES lists must all be the same length");
2213 }
2214
2215 let ecx = &ExprContext {
2216 qcx,
2217 name: "VALUES",
2218 scope: &Scope::empty(),
2219 relation_type: &SqlRelationType::empty(),
2220 allow_aggregates: false,
2221 allow_subqueries: true,
2222 allow_parameters: true,
2223 allow_windows: false,
2224 };
2225
2226 let mut exprs = vec![];
2227 let mut types = vec![];
2228 for row in values {
2229 if row.len() > target_names.len() {
2230 sql_bail!("INSERT has more expressions than target columns");
2231 }
2232 for (column, val) in row.into_iter().enumerate() {
2233 let target_type = &target_types[column];
2234 let val = plan_expr(ecx, val)?;
2235 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2236 let source_type = &ecx.scalar_type(&val);
2237 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2238 Ok(val) => val,
2239 Err(_) => sql_bail!(
2240 "column {} is of type {} but expression is of type {}",
2241 target_names[column].quoted(),
2242 qcx.humanize_scalar_type(target_type, false),
2243 qcx.humanize_scalar_type(source_type, false),
2244 ),
2245 };
2246 if column >= types.len() {
2247 types.push(ecx.column_type(&val));
2248 } else {
2249 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2251 exprs.push(val);
2252 }
2253 }
2254
2255 Ok(HirRelationExpr::CallTable {
2256 func: TableFunc::Wrap {
2257 width: values[0].len(),
2258 types,
2259 },
2260 exprs,
2261 })
2262}
2263
2264fn plan_join_identity() -> (HirRelationExpr, Scope) {
2265 let typ = SqlRelationType::new(vec![]);
2266 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2267 let scope = Scope::empty();
2268 (expr, scope)
2269}
2270
2271#[derive(Debug)]
2277struct SelectPlan {
2278 expr: HirRelationExpr,
2279 scope: Scope,
2280 order_by: Vec<ColumnOrder>,
2281 project: Vec<usize>,
2282}
2283
2284generate_extracted_config!(
2285 SelectOption,
2286 (ExpectedGroupSize, u64),
2287 (AggregateInputGroupSize, u64),
2288 (DistinctOnInputGroupSize, u64),
2289 (LimitInputGroupSize, u64)
2290);
2291
2292fn plan_select_from_where(
2310 qcx: &QueryContext,
2311 mut s: Select<Aug>,
2312 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2313) -> Result<SelectPlan, PlanError> {
2314 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2321 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2322
2323 let (mut relation_expr, mut from_scope) =
2325 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2326 let (left, left_scope) = l;
2327 plan_join(
2328 qcx,
2329 left,
2330 left_scope,
2331 &Join {
2332 relation: TableFactor::NestedJoin {
2333 join: Box::new(twj.clone()),
2334 alias: None,
2335 },
2336 join_operator: JoinOperator::CrossJoin,
2337 },
2338 )
2339 })?;
2340
2341 if let Some(selection) = &s.selection {
2343 let ecx = &ExprContext {
2344 qcx,
2345 name: "WHERE clause",
2346 scope: &from_scope,
2347 relation_type: &qcx.relation_type(&relation_expr),
2348 allow_aggregates: false,
2349 allow_subqueries: true,
2350 allow_parameters: true,
2351 allow_windows: false,
2352 };
2353 let expr = plan_expr(ecx, selection)
2354 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2355 .type_as(ecx, &SqlScalarType::Bool)?;
2356 relation_expr = relation_expr.filter(vec![expr]);
2357 }
2358
2359 let (aggregates, table_funcs) = {
2362 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2363 visitor.visit_select_mut(&mut s);
2364 for o in order_by_exprs.iter_mut() {
2365 visitor.visit_order_by_expr_mut(o);
2366 }
2367 visitor.into_result()?
2368 };
2369 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2370 if !table_funcs.is_empty() {
2371 let (expr, scope) = plan_scalar_table_funcs(
2372 qcx,
2373 table_funcs,
2374 &mut table_func_names,
2375 &relation_expr,
2376 &from_scope,
2377 )?;
2378 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2379 from_scope = from_scope.product(scope)?;
2380 }
2381
2382 let projection = {
2384 let ecx = &ExprContext {
2385 qcx,
2386 name: "SELECT clause",
2387 scope: &from_scope,
2388 relation_type: &qcx.relation_type(&relation_expr),
2389 allow_aggregates: true,
2390 allow_subqueries: true,
2391 allow_parameters: true,
2392 allow_windows: true,
2393 };
2394 let mut out = vec![];
2395 for si in &s.projection {
2396 if *si == SelectItem::Wildcard && s.from.is_empty() {
2397 sql_bail!("SELECT * with no tables specified is not valid");
2398 }
2399 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2400 }
2401 out
2402 };
2403
2404 let (mut group_scope, select_all_mapping) = {
2408 let ecx = &ExprContext {
2410 qcx,
2411 name: "GROUP BY clause",
2412 scope: &from_scope,
2413 relation_type: &qcx.relation_type(&relation_expr),
2414 allow_aggregates: false,
2415 allow_subqueries: true,
2416 allow_parameters: true,
2417 allow_windows: false,
2418 };
2419 let mut group_key = vec![];
2420 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2421 let mut group_hir_exprs = vec![];
2422 let mut group_scope = Scope::empty();
2423 let mut select_all_mapping = BTreeMap::new();
2424
2425 for group_expr in &s.group_by {
2426 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2427 let new_column = group_key.len();
2428
2429 if let Some(group_expr) = group_expr {
2430 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2434 existing_scope_item.exprs.insert(group_expr.clone());
2435 continue;
2436 }
2437 }
2438
2439 let mut scope_item = if let HirScalarExpr::Column(
2440 ColumnRef {
2441 level: 0,
2442 column: old_column,
2443 },
2444 _name,
2445 ) = &expr
2446 {
2447 select_all_mapping.insert(*old_column, new_column);
2453 let scope_item = ecx.scope.items[*old_column].clone();
2454 scope_item
2455 } else {
2456 ScopeItem::empty()
2457 };
2458
2459 if let Some(group_expr) = group_expr.cloned() {
2460 scope_item.exprs.insert(group_expr);
2461 }
2462
2463 group_key.push(from_scope.len() + group_exprs.len());
2464 group_hir_exprs.push(expr.clone());
2465 group_exprs.insert(expr, scope_item);
2466 }
2467
2468 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2469 for expr in &group_hir_exprs {
2470 if let Some(scope_item) = group_exprs.remove(expr) {
2471 group_scope.items.push(scope_item);
2472 }
2473 }
2474
2475 let ecx = &ExprContext {
2477 qcx,
2478 name: "aggregate function",
2479 scope: &from_scope,
2480 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2481 allow_aggregates: false,
2482 allow_subqueries: true,
2483 allow_parameters: true,
2484 allow_windows: false,
2485 };
2486 let mut agg_exprs = vec![];
2487 for sql_function in aggregates {
2488 if sql_function.over.is_some() {
2489 unreachable!(
2490 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2491 );
2492 }
2493 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2494 group_scope
2495 .items
2496 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2497 }
2498 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2499 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2501 group_key,
2502 agg_exprs,
2503 group_size_hints.aggregate_input_group_size,
2504 );
2505
2506 for i in 0..from_scope.len() {
2512 if !select_all_mapping.contains_key(&i) {
2513 let scope_item = &ecx.scope.items[i];
2514 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2515 table_name: scope_item.table_name.clone(),
2516 column_name: scope_item.column_name.clone(),
2517 allow_unqualified_references: scope_item.allow_unqualified_references,
2518 });
2519 }
2520 }
2521
2522 (group_scope, select_all_mapping)
2523 } else {
2524 (
2526 from_scope.clone(),
2527 (0..from_scope.len()).map(|i| (i, i)).collect(),
2528 )
2529 }
2530 };
2531
2532 if let Some(ref having) = s.having {
2534 let ecx = &ExprContext {
2535 qcx,
2536 name: "HAVING clause",
2537 scope: &group_scope,
2538 relation_type: &qcx.relation_type(&relation_expr),
2539 allow_aggregates: true,
2540 allow_subqueries: true,
2541 allow_parameters: true,
2542 allow_windows: false,
2543 };
2544 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2545 relation_expr = relation_expr.filter(vec![expr]);
2546 }
2547
2548 let window_funcs = {
2561 let mut visitor = WindowFuncCollector::default();
2562 visitor.visit_select(&s);
2566 for o in order_by_exprs.iter() {
2567 visitor.visit_order_by_expr(o);
2568 }
2569 visitor.into_result()
2570 };
2571 for window_func in window_funcs {
2572 let ecx = &ExprContext {
2573 qcx,
2574 name: "window function",
2575 scope: &group_scope,
2576 relation_type: &qcx.relation_type(&relation_expr),
2577 allow_aggregates: true,
2578 allow_subqueries: true,
2579 allow_parameters: true,
2580 allow_windows: true,
2581 };
2582 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2583 group_scope.items.push(ScopeItem::from_expr(window_func));
2584 }
2585 if let Some(ref qualify) = s.qualify {
2592 let ecx = &ExprContext {
2593 qcx,
2594 name: "QUALIFY clause",
2595 scope: &group_scope,
2596 relation_type: &qcx.relation_type(&relation_expr),
2597 allow_aggregates: true,
2598 allow_subqueries: true,
2599 allow_parameters: true,
2600 allow_windows: true,
2601 };
2602 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2603 relation_expr = relation_expr.filter(vec![expr]);
2604 }
2605
2606 let output_columns = {
2608 let mut new_exprs = vec![];
2609 let mut new_type = qcx.relation_type(&relation_expr);
2610 let mut output_columns = vec![];
2611 for (select_item, column_name) in &projection {
2612 let ecx = &ExprContext {
2613 qcx,
2614 name: "SELECT clause",
2615 scope: &group_scope,
2616 relation_type: &new_type,
2617 allow_aggregates: true,
2618 allow_subqueries: true,
2619 allow_parameters: true,
2620 allow_windows: true,
2621 };
2622 let expr = match select_item {
2623 ExpandedSelectItem::InputOrdinal(i) => {
2624 if let Some(column) = select_all_mapping.get(i).copied() {
2625 HirScalarExpr::column(column)
2626 } else {
2627 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2628 }
2629 }
2630 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2631 };
2632 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2633 output_columns.push((column, column_name));
2635 } else {
2636 let typ = ecx.column_type(&expr);
2643 new_type.column_types.push(typ);
2644 new_exprs.push(expr);
2645 output_columns.push((group_scope.len(), column_name));
2646 group_scope
2647 .items
2648 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2649 }
2650 }
2651 relation_expr = relation_expr.map(new_exprs);
2652 output_columns
2653 };
2654 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2655
2656 let order_by = {
2658 let relation_type = qcx.relation_type(&relation_expr);
2659 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2660 &ExprContext {
2661 qcx,
2662 name: "ORDER BY clause",
2663 scope: &group_scope,
2664 relation_type: &relation_type,
2665 allow_aggregates: true,
2666 allow_subqueries: true,
2667 allow_parameters: true,
2668 allow_windows: true,
2669 },
2670 &order_by_exprs,
2671 &output_columns,
2672 )?;
2673
2674 match s.distinct {
2675 None => relation_expr = relation_expr.map(map_exprs),
2676 Some(Distinct::EntireRow) => {
2677 if relation_type.arity() == 0 {
2678 sql_bail!("SELECT DISTINCT must have at least one column");
2679 }
2680 if !try_push_projection_order_by(
2684 &mut relation_expr,
2685 &mut project_key,
2686 &mut order_by,
2687 ) {
2688 sql_bail!(
2689 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2690 );
2691 }
2692 assert!(map_exprs.is_empty());
2693 relation_expr = relation_expr.distinct();
2694 }
2695 Some(Distinct::On(exprs)) => {
2696 let ecx = &ExprContext {
2697 qcx,
2698 name: "DISTINCT ON clause",
2699 scope: &group_scope,
2700 relation_type: &qcx.relation_type(&relation_expr),
2701 allow_aggregates: true,
2702 allow_subqueries: true,
2703 allow_parameters: true,
2704 allow_windows: true,
2705 };
2706
2707 let mut distinct_exprs = vec![];
2708 for expr in &exprs {
2709 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2710 distinct_exprs.push(expr);
2711 }
2712
2713 let mut distinct_key = vec![];
2714
2715 let arity = relation_type.arity();
2725 for ord in order_by.iter().take(distinct_exprs.len()) {
2726 let mut expr = &HirScalarExpr::column(ord.column);
2729 if ord.column >= arity {
2730 expr = &map_exprs[ord.column - arity];
2731 };
2732 match distinct_exprs.iter().position(move |e| e == expr) {
2733 None => sql_bail!(
2734 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2735 ),
2736 Some(pos) => {
2737 distinct_exprs.remove(pos);
2738 }
2739 }
2740 distinct_key.push(ord.column);
2741 }
2742
2743 for expr in distinct_exprs {
2745 let column = match expr {
2748 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2749 _ => {
2750 map_exprs.push(expr);
2751 arity + map_exprs.len() - 1
2752 }
2753 };
2754 distinct_key.push(column);
2755 }
2756
2757 let distinct_len = distinct_key.len();
2762 relation_expr = HirRelationExpr::top_k(
2763 relation_expr.map(map_exprs),
2764 distinct_key,
2765 order_by.iter().skip(distinct_len).cloned().collect(),
2766 Some(HirScalarExpr::literal(
2767 Datum::Int64(1),
2768 SqlScalarType::Int64,
2769 )),
2770 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2771 group_size_hints.distinct_on_input_group_size,
2772 );
2773 }
2774 }
2775
2776 order_by
2777 };
2778
2779 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2784
2785 Ok(SelectPlan {
2786 expr: relation_expr,
2787 scope,
2788 order_by,
2789 project: project_key,
2790 })
2791}
2792
2793fn plan_scalar_table_funcs(
2794 qcx: &QueryContext,
2795 table_funcs: BTreeMap<Function<Aug>, String>,
2796 table_func_names: &mut BTreeMap<String, Ident>,
2797 relation_expr: &HirRelationExpr,
2798 from_scope: &Scope,
2799) -> Result<(HirRelationExpr, Scope), PlanError> {
2800 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2801
2802 for (table_func, id) in table_funcs.iter() {
2803 table_func_names.insert(
2804 id.clone(),
2805 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2807 );
2808 }
2809 if table_funcs.len() == 1 {
2812 let (table_func, id) = table_funcs.iter().next().unwrap();
2813 let (expr, mut scope) =
2814 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2815
2816 let num_cols = scope.len();
2818 for i in 0..scope.len() {
2819 scope.items[i].table_name = Some(PartialItemName {
2820 database: None,
2821 schema: None,
2822 item: id.clone(),
2823 });
2824 scope.items[i].from_single_column_function = num_cols == 1;
2825 scope.items[i].allow_unqualified_references = false;
2826 }
2827 return Ok((expr, scope));
2828 }
2829 let (expr, mut scope, num_cols) =
2831 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2832
2833 let mut i = 0;
2835 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2836 for _ in 0..num_cols {
2837 scope.items[i].table_name = Some(PartialItemName {
2838 database: None,
2839 schema: None,
2840 item: id.clone(),
2841 });
2842 scope.items[i].from_single_column_function = num_cols == 1;
2843 scope.items[i].allow_unqualified_references = false;
2844 i += 1;
2845 }
2846 scope.items[i].table_name = Some(PartialItemName {
2850 database: None,
2851 schema: None,
2852 item: id.clone(),
2853 });
2854 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2855 scope.items[i].allow_unqualified_references = false;
2856 i += 1;
2857 }
2858 scope.items[i].allow_unqualified_references = false;
2860 Ok((expr, scope))
2861}
2862
2863fn plan_group_by_expr<'a>(
2870 ecx: &ExprContext,
2871 group_expr: &'a Expr<Aug>,
2872 projection: &'a [(ExpandedSelectItem, ColumnName)],
2873) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2874 let plan_projection = |column: usize| match &projection[column].0 {
2875 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2876 ExpandedSelectItem::Expr(expr) => {
2877 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2878 }
2879 };
2880
2881 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2884 return plan_projection(column);
2885 }
2886
2887 match group_expr {
2891 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2892 Err(PlanError::UnknownColumn {
2893 table: None,
2894 column,
2895 similar,
2896 }) => {
2897 let mut iter = projection.iter().map(|(_expr, name)| name);
2900 if let Some(i) = iter.position(|n| *n == column) {
2901 if iter.any(|n| *n == column) {
2902 Err(PlanError::AmbiguousColumn(column))
2903 } else {
2904 plan_projection(i)
2905 }
2906 } else {
2907 Err(PlanError::UnknownColumn {
2910 table: None,
2911 column,
2912 similar,
2913 })
2914 }
2915 }
2916 res => Ok((Some(group_expr), res?)),
2917 },
2918 _ => Ok((
2919 Some(group_expr),
2920 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2921 )),
2922 }
2923}
2924
2925pub(crate) fn plan_order_by_exprs(
2933 ecx: &ExprContext,
2934 order_by_exprs: &[OrderByExpr<Aug>],
2935 output_columns: &[(usize, &ColumnName)],
2936) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2937 let mut order_by = vec![];
2938 let mut map_exprs = vec![];
2939 for obe in order_by_exprs {
2940 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2941 let column = match expr {
2944 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2945 _ => {
2946 map_exprs.push(expr);
2947 ecx.relation_type.arity() + map_exprs.len() - 1
2948 }
2949 };
2950 order_by.push(resolve_desc_and_nulls_last(obe, column));
2951 }
2952 Ok((order_by, map_exprs))
2953}
2954
2955fn plan_order_by_or_distinct_expr(
2973 ecx: &ExprContext,
2974 expr: &Expr<Aug>,
2975 output_columns: &[(usize, &ColumnName)],
2976) -> Result<HirScalarExpr, PlanError> {
2977 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2978 return Ok(HirScalarExpr::column(output_columns[i].0));
2979 }
2980
2981 if let Expr::Identifier(names) = expr {
2982 if let [name] = &names[..] {
2983 let name = normalize::column_name(name.clone());
2984 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2985 if let Some((i, _)) = iter.next() {
2986 match iter.next() {
2987 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2991 _ => return Ok(HirScalarExpr::column(*i)),
2992 }
2993 }
2994 }
2995 }
2996
2997 plan_expr(ecx, expr)?.type_as_any(ecx)
2998}
2999
3000fn plan_table_with_joins(
3001 qcx: &QueryContext,
3002 table_with_joins: &TableWithJoins<Aug>,
3003) -> Result<(HirRelationExpr, Scope), PlanError> {
3004 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
3005 for join in &table_with_joins.joins {
3006 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
3007 expr = new_expr;
3008 scope = new_scope;
3009 }
3010 Ok((expr, scope))
3011}
3012
3013fn plan_table_factor(
3014 qcx: &QueryContext,
3015 table_factor: &TableFactor<Aug>,
3016) -> Result<(HirRelationExpr, Scope), PlanError> {
3017 match table_factor {
3018 TableFactor::Table { name, alias } => {
3019 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3020 let scope = plan_table_alias(scope, alias.as_ref())?;
3021 Ok((expr, scope))
3022 }
3023
3024 TableFactor::Function {
3025 function,
3026 alias,
3027 with_ordinality,
3028 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3029
3030 TableFactor::RowsFrom {
3031 functions,
3032 alias,
3033 with_ordinality,
3034 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3035
3036 TableFactor::Derived {
3037 lateral,
3038 subquery,
3039 alias,
3040 } => {
3041 let mut qcx = (*qcx).clone();
3042 if !lateral {
3043 for scope in &mut qcx.outer_scopes {
3047 if scope.lateral_barrier {
3048 break;
3049 }
3050 scope.items.clear();
3051 }
3052 }
3053 qcx.outer_scopes[0].lateral_barrier = true;
3054 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3055 let scope = plan_table_alias(scope, alias.as_ref())?;
3056 Ok((expr, scope))
3057 }
3058
3059 TableFactor::NestedJoin { join, alias } => {
3060 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3061 let scope = plan_table_alias(scope, alias.as_ref())?;
3062 Ok((expr, scope))
3063 }
3064 }
3065}
3066
3067fn plan_rows_from(
3109 qcx: &QueryContext,
3110 functions: &[Function<Aug>],
3111 alias: Option<&TableAlias>,
3112 with_ordinality: bool,
3113) -> Result<(HirRelationExpr, Scope), PlanError> {
3114 if let [function] = functions {
3117 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3118 }
3119
3120 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3124 qcx,
3125 functions,
3126 Some(functions[0].name.full_item_name().clone()),
3127 )?;
3128
3129 let mut columns = Vec::new();
3131 let mut offset = 0;
3132 for (idx, cols) in num_cols.into_iter().enumerate() {
3134 for i in 0..cols {
3135 columns.push(offset + i);
3136 }
3137 offset += cols + 1;
3138
3139 scope.items.remove(offset - idx - 1);
3142 }
3143
3144 if with_ordinality {
3147 columns.push(offset);
3148 } else {
3149 scope.items.pop();
3150 }
3151
3152 let expr = expr.project(columns);
3153
3154 let scope = plan_table_alias(scope, alias)?;
3155 Ok((expr, scope))
3156}
3157
3158fn plan_rows_from_internal<'a>(
3181 qcx: &QueryContext,
3182 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3183 table_name: Option<FullItemName>,
3184) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3185 let mut functions = functions.into_iter();
3186 let mut num_cols = Vec::new();
3187
3188 let (mut left_expr, mut left_scope) =
3192 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3193 num_cols.push(left_scope.len() - 1);
3194 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3196 left_scope
3197 .items
3198 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3199
3200 for function in functions {
3201 let qcx = qcx.empty_derived_context();
3203 let (right_expr, mut right_scope) =
3204 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3205 num_cols.push(right_scope.len() - 1);
3206 let left_col = left_scope.len() - 1;
3207 let right_col = left_scope.len() + right_scope.len() - 1;
3208 let on = HirScalarExpr::call_binary(
3209 HirScalarExpr::column(left_col),
3210 HirScalarExpr::column(right_col),
3211 expr_func::Eq,
3212 );
3213 left_expr = left_expr
3214 .join(right_expr, on, JoinKind::FullOuter)
3215 .map(vec![HirScalarExpr::call_variadic(
3216 Coalesce,
3217 vec![
3218 HirScalarExpr::column(left_col),
3219 HirScalarExpr::column(right_col),
3220 ],
3221 )]);
3222
3223 left_expr = left_expr.project(
3226 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3229 );
3230 right_scope.items.push(left_scope.items.pop().unwrap());
3232
3233 left_scope.items.extend(right_scope.items);
3234 }
3235
3236 Ok((left_expr, left_scope, num_cols))
3237}
3238
3239fn plan_solitary_table_function(
3243 qcx: &QueryContext,
3244 function: &Function<Aug>,
3245 alias: Option<&TableAlias>,
3246 with_ordinality: bool,
3247) -> Result<(HirRelationExpr, Scope), PlanError> {
3248 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3249
3250 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3251 if single_column_function {
3252 let item = &mut scope.items[0];
3253
3254 item.from_single_column_function = true;
3257
3258 if let Some(alias) = alias {
3273 if let ScopeItem {
3274 table_name: Some(table_name),
3275 column_name,
3276 ..
3277 } = item
3278 {
3279 if table_name.item.as_str() == column_name.as_str() {
3280 *column_name = normalize::column_name(alias.name.clone());
3281 }
3282 }
3283 }
3284 }
3285
3286 let scope = plan_table_alias(scope, alias)?;
3287 Ok((expr, scope))
3288}
3289
3290fn plan_table_function_internal(
3295 qcx: &QueryContext,
3296 Function {
3297 name,
3298 args,
3299 filter,
3300 over,
3301 distinct,
3302 }: &Function<Aug>,
3303 with_ordinality: bool,
3304 table_name: Option<FullItemName>,
3305) -> Result<(HirRelationExpr, Scope), PlanError> {
3306 assert_none!(filter, "cannot parse table function with FILTER");
3307 assert_none!(over, "cannot parse table function with OVER");
3308 assert!(!*distinct, "cannot parse table function with DISTINCT");
3309
3310 let ecx = &ExprContext {
3311 qcx,
3312 name: "table function arguments",
3313 scope: &Scope::empty(),
3314 relation_type: &SqlRelationType::empty(),
3315 allow_aggregates: false,
3316 allow_subqueries: true,
3317 allow_parameters: true,
3318 allow_windows: false,
3319 };
3320
3321 let scalar_args = match args {
3322 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3323 FunctionArgs::Args { args, order_by } => {
3324 if !order_by.is_empty() {
3325 sql_bail!(
3326 "ORDER BY specified, but {} is not an aggregate function",
3327 name
3328 );
3329 }
3330 plan_exprs(ecx, args)?
3331 }
3332 };
3333
3334 let table_name = match table_name {
3335 Some(table_name) => table_name.item,
3336 None => name.full_item_name().item.clone(),
3337 };
3338
3339 let scope_name = Some(PartialItemName {
3340 database: None,
3341 schema: None,
3342 item: table_name,
3343 });
3344
3345 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3346 Func::Table(impls) => {
3347 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3348 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3349 let expr = match tf.imp {
3350 TableFuncImpl::CallTable { mut func, exprs } => {
3351 if with_ordinality {
3352 func = TableFunc::with_ordinality(func.clone()).ok_or(
3353 PlanError::Unsupported {
3354 feature: format!("WITH ORDINALITY on {}", func),
3355 discussion_no: None,
3356 },
3357 )?;
3358 }
3359 HirRelationExpr::CallTable { func, exprs }
3360 }
3361 TableFuncImpl::Expr(expr) => {
3362 if !with_ordinality {
3363 expr
3364 } else {
3365 if qcx
3369 .scx
3370 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3371 {
3372 tracing::error!(
3376 %name,
3377 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3378 );
3379 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3380 func: WindowExprType::Scalar(ScalarWindowExpr {
3381 func: ScalarWindowFunc::RowNumber,
3382 order_by: vec![],
3383 }),
3384 partition_by: vec![],
3385 order_by: vec![],
3386 })])
3387 } else {
3388 bail_unsupported!(format!(
3389 "WITH ORDINALITY or ROWS FROM with {}",
3390 name
3391 ));
3392 }
3393 }
3394 }
3395 };
3396 (expr, scope)
3397 }
3398 Func::Scalar(impls) => {
3399 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3400 let output = expr.typ(
3401 &qcx.outer_relation_types,
3402 &SqlRelationType::new(vec![]),
3403 &qcx.scx.param_types.borrow(),
3404 );
3405
3406 let relation = SqlRelationType::new(vec![output]);
3407
3408 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3409 let column_name = normalize::column_name(function_ident);
3410 let name = column_name.to_string();
3411
3412 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3413
3414 let mut func = TableFunc::TabletizedScalar { relation, name };
3415 if with_ordinality {
3416 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3417 feature: format!("WITH ORDINALITY on {}", func),
3418 discussion_no: None,
3419 })?;
3420 }
3421 (
3422 HirRelationExpr::CallTable {
3423 func,
3424 exprs: vec![expr],
3425 },
3426 scope,
3427 )
3428 }
3429 o => sql_bail!(
3430 "{} functions are not supported in functions in FROM",
3431 o.class()
3432 ),
3433 };
3434
3435 if with_ordinality {
3436 scope
3437 .items
3438 .push(ScopeItem::from_name(scope_name, "ordinality"));
3439 }
3440
3441 Ok((expr, scope))
3442}
3443
3444fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3445 if let Some(TableAlias {
3446 name,
3447 columns,
3448 strict,
3449 }) = alias
3450 {
3451 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3452 sql_bail!(
3453 "{} has {} columns available but {} columns specified",
3454 name,
3455 scope.items.len(),
3456 columns.len()
3457 );
3458 }
3459
3460 let table_name = normalize::ident(name.to_owned());
3461 for (i, item) in scope.items.iter_mut().enumerate() {
3462 item.table_name = if item.allow_unqualified_references {
3463 Some(PartialItemName {
3464 database: None,
3465 schema: None,
3466 item: table_name.clone(),
3467 })
3468 } else {
3469 None
3503 };
3504 item.column_name = columns
3505 .get(i)
3506 .map(|a| normalize::column_name(a.clone()))
3507 .unwrap_or_else(|| item.column_name.clone());
3508 }
3509 }
3510 Ok(scope)
3511}
3512
3513fn invent_column_name(
3517 ecx: &ExprContext,
3518 expr: &Expr<Aug>,
3519 table_func_names: &BTreeMap<String, Ident>,
3520) -> Option<ColumnName> {
3521 #[derive(Debug)]
3528 enum NameQuality {
3529 Low,
3530 High,
3531 }
3532
3533 fn invent(
3534 ecx: &ExprContext,
3535 expr: &Expr<Aug>,
3536 table_func_names: &BTreeMap<String, Ident>,
3537 ) -> Option<(ColumnName, NameQuality)> {
3538 match expr {
3539 Expr::Identifier(names) => {
3540 if let [name] = names.as_slice() {
3541 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3542 return Some((
3543 normalize::column_name(table_func_name.clone()),
3544 NameQuality::High,
3545 ));
3546 }
3547 }
3548 names
3549 .last()
3550 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3551 }
3552 Expr::Value(v) => match v {
3553 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3556 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3557 _ => None,
3558 },
3559 Expr::Function(func) => {
3560 let (schema, item) = match &func.name {
3561 ResolvedItemName::Item {
3562 qualifiers,
3563 full_name,
3564 ..
3565 } => (&qualifiers.schema_spec, full_name.item.clone()),
3566 _ => unreachable!(),
3567 };
3568
3569 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3570 || schema
3571 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3572 {
3573 None
3574 } else {
3575 Some((item.into(), NameQuality::High))
3576 }
3577 }
3578 Expr::HomogenizingFunction { function, .. } => Some((
3579 function.to_string().to_lowercase().into(),
3580 NameQuality::High,
3581 )),
3582 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3583 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3584 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3585 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3586 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3587 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3588 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3589 },
3590 Expr::Case { else_result, .. } => {
3591 match else_result
3592 .as_ref()
3593 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3594 {
3595 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3596 _ => Some(("case".into(), NameQuality::Low)),
3597 }
3598 }
3599 Expr::FieldAccess { field, .. } => {
3600 Some((normalize::column_name(field.clone()), NameQuality::High))
3601 }
3602 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3603 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3604 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3605 let (_expr, scope) =
3609 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3610 scope
3611 .items
3612 .first()
3613 .map(|name| (name.column_name.clone(), NameQuality::High))
3614 }
3615 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3616 _ => None,
3617 }
3618 }
3619
3620 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3621}
3622
3623#[derive(Debug)]
3624enum ExpandedSelectItem<'a> {
3625 InputOrdinal(usize),
3626 Expr(Cow<'a, Expr<Aug>>),
3627}
3628
3629impl ExpandedSelectItem<'_> {
3630 fn as_expr(&self) -> Option<&Expr<Aug>> {
3631 match self {
3632 ExpandedSelectItem::InputOrdinal(_) => None,
3633 ExpandedSelectItem::Expr(expr) => Some(expr),
3634 }
3635 }
3636}
3637
3638fn expand_select_item<'a>(
3639 ecx: &ExprContext,
3640 s: &'a SelectItem<Aug>,
3641 table_func_names: &BTreeMap<String, Ident>,
3642) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3643 match s {
3644 SelectItem::Expr {
3645 expr: Expr::QualifiedWildcard(table_name),
3646 alias: _,
3647 } => {
3648 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3649 let table_name =
3650 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3651 let out: Vec<_> = ecx
3652 .scope
3653 .items
3654 .iter()
3655 .enumerate()
3656 .filter(|(_i, item)| item.is_from_table(&table_name))
3657 .map(|(i, item)| {
3658 let name = item.column_name.clone();
3659 (ExpandedSelectItem::InputOrdinal(i), name)
3660 })
3661 .collect();
3662 if out.is_empty() {
3663 sql_bail!("no table named '{}' in scope", table_name);
3664 }
3665 Ok(out)
3666 }
3667 SelectItem::Expr {
3668 expr: Expr::WildcardAccess(sql_expr),
3669 alias: _,
3670 } => {
3671 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3672 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3678 let fields = match ecx.scalar_type(&expr) {
3679 SqlScalarType::Record { fields, .. } => fields,
3680 ty => sql_bail!(
3681 "type {} is not composite",
3682 ecx.humanize_scalar_type(&ty, false)
3683 ),
3684 };
3685 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3686 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3687 if let [name] = ident.as_slice() {
3688 if let Ok(items) = ecx.scope.items_from_table(
3689 &[],
3690 &PartialItemName {
3691 database: None,
3692 schema: None,
3693 item: name.as_str().to_string(),
3694 },
3695 ) {
3696 for (_, item) in items {
3697 if item
3698 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3699 {
3700 skip_cols.insert(item.column_name.clone());
3701 }
3702 }
3703 }
3704 }
3705 }
3706 let items = fields
3707 .iter()
3708 .filter_map(|(name, _ty)| {
3709 if skip_cols.contains(name) {
3710 None
3711 } else {
3712 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3713 expr: sql_expr.clone(),
3714 field: name.clone().into(),
3715 }));
3716 Some((item, name.clone()))
3717 }
3718 })
3719 .collect();
3720 Ok(items)
3721 }
3722 SelectItem::Wildcard => {
3723 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3724 let items: Vec<_> = ecx
3725 .scope
3726 .items
3727 .iter()
3728 .enumerate()
3729 .filter(|(_i, item)| item.allow_unqualified_references)
3730 .map(|(i, item)| {
3731 let name = item.column_name.clone();
3732 (ExpandedSelectItem::InputOrdinal(i), name)
3733 })
3734 .collect();
3735
3736 Ok(items)
3737 }
3738 SelectItem::Expr { expr, alias } => {
3739 let name = alias
3740 .clone()
3741 .map(normalize::column_name)
3742 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3743 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3744 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3745 }
3746 }
3747}
3748
3749fn plan_join(
3750 left_qcx: &QueryContext,
3751 left: HirRelationExpr,
3752 left_scope: Scope,
3753 join: &Join<Aug>,
3754) -> Result<(HirRelationExpr, Scope), PlanError> {
3755 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3756 let (kind, constraint) = match &join.join_operator {
3757 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3758 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3759 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3760 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3761 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3762 };
3763
3764 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3765 if !kind.can_be_correlated() {
3766 for item in &mut right_qcx.outer_scopes[0].items {
3767 item.error_if_referenced =
3772 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3773 table: table.cloned(),
3774 column: column.clone(),
3775 });
3776 }
3777 }
3778 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3779
3780 let (expr, scope) = match constraint {
3781 JoinConstraint::On(expr) => {
3782 let product_scope = left_scope.product(right_scope)?;
3783 let ecx = &ExprContext {
3784 qcx: left_qcx,
3785 name: "ON clause",
3786 scope: &product_scope,
3787 relation_type: &SqlRelationType::new(
3788 left_qcx
3789 .relation_type(&left)
3790 .column_types
3791 .into_iter()
3792 .chain(right_qcx.relation_type(&right).column_types)
3793 .collect(),
3794 ),
3795 allow_aggregates: false,
3796 allow_subqueries: true,
3797 allow_parameters: true,
3798 allow_windows: false,
3799 };
3800 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3801 let joined = left.join(right, on, kind);
3802 (joined, product_scope)
3803 }
3804 JoinConstraint::Using { columns, alias } => {
3805 let column_names = columns
3806 .iter()
3807 .map(|ident| normalize::column_name(ident.clone()))
3808 .collect::<Vec<_>>();
3809
3810 plan_using_constraint(
3811 &column_names,
3812 left_qcx,
3813 left,
3814 left_scope,
3815 &right_qcx,
3816 right,
3817 right_scope,
3818 kind,
3819 alias.as_ref(),
3820 )?
3821 }
3822 JoinConstraint::Natural => {
3823 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3826 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3827 let left_column_names = left_scope.column_names();
3828 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3829 let column_names: Vec<_> = left_column_names
3830 .filter(|col| right_column_names.contains(col))
3831 .cloned()
3832 .collect();
3833 plan_using_constraint(
3834 &column_names,
3835 left_qcx,
3836 left,
3837 left_scope,
3838 &right_qcx,
3839 right,
3840 right_scope,
3841 kind,
3842 None,
3843 )?
3844 }
3845 };
3846 Ok((expr, scope))
3847}
3848
3849#[allow(clippy::too_many_arguments)]
3851fn plan_using_constraint(
3852 column_names: &[ColumnName],
3853 left_qcx: &QueryContext,
3854 left: HirRelationExpr,
3855 left_scope: Scope,
3856 right_qcx: &QueryContext,
3857 right: HirRelationExpr,
3858 right_scope: Scope,
3859 kind: JoinKind,
3860 alias: Option<&Ident>,
3861) -> Result<(HirRelationExpr, Scope), PlanError> {
3862 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3863
3864 let mut unique_column_names = BTreeSet::new();
3867 for c in column_names {
3868 if !unique_column_names.insert(c) {
3869 return Err(PlanError::Unsupported {
3870 feature: format!(
3871 "column name {} appears more than once in USING clause",
3872 c.quoted()
3873 ),
3874 discussion_no: None,
3875 });
3876 }
3877 }
3878
3879 let alias_item_name = alias.map(|alias| PartialItemName {
3880 database: None,
3881 schema: None,
3882 item: alias.clone().to_string(),
3883 });
3884
3885 if let Some(alias_item_name) = &alias_item_name {
3886 for partial_item_name in both_scope.table_names() {
3887 if partial_item_name.matches(alias_item_name) {
3888 sql_bail!(
3889 "table name \"{}\" specified more than once",
3890 alias_item_name
3891 )
3892 }
3893 }
3894 }
3895
3896 let ecx = &ExprContext {
3897 qcx: right_qcx,
3898 name: "USING clause",
3899 scope: &both_scope,
3900 relation_type: &SqlRelationType::new(
3901 left_qcx
3902 .relation_type(&left)
3903 .column_types
3904 .into_iter()
3905 .chain(right_qcx.relation_type(&right).column_types)
3906 .collect(),
3907 ),
3908 allow_aggregates: false,
3909 allow_subqueries: false,
3910 allow_parameters: false,
3911 allow_windows: false,
3912 };
3913
3914 let mut join_exprs = vec![];
3915 let mut map_exprs = vec![];
3916 let mut new_items = vec![];
3917 let mut join_cols = vec![];
3918 let mut hidden_cols = vec![];
3919
3920 for column_name in column_names {
3921 let (lhs, lhs_name) = left_scope.resolve_using_column(
3923 column_name,
3924 JoinSide::Left,
3925 &mut left_qcx.name_manager.borrow_mut(),
3926 )?;
3927 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3928 column_name,
3929 JoinSide::Right,
3930 &mut right_qcx.name_manager.borrow_mut(),
3931 )?;
3932
3933 rhs.column += left_scope.len();
3935
3936 let mut exprs = coerce_homogeneous_exprs(
3938 &ecx.with_name(&format!(
3939 "NATURAL/USING join column {}",
3940 column_name.quoted()
3941 )),
3942 vec![
3943 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3944 lhs,
3945 Arc::clone(&lhs_name),
3946 )),
3947 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3948 rhs,
3949 Arc::clone(&rhs_name),
3950 )),
3951 ],
3952 None,
3953 )?;
3954 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3955
3956 match kind {
3957 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3958 join_cols.push(lhs.column);
3959 hidden_cols.push(rhs.column);
3960 }
3961 JoinKind::RightOuter => {
3962 join_cols.push(rhs.column);
3963 hidden_cols.push(lhs.column);
3964 }
3965 JoinKind::FullOuter => {
3966 join_cols.push(both_scope.items.len() + map_exprs.len());
3969 hidden_cols.push(lhs.column);
3970 hidden_cols.push(rhs.column);
3971 map_exprs.push(HirScalarExpr::call_variadic(
3972 Coalesce,
3973 vec![expr1.clone(), expr2.clone()],
3974 ));
3975 new_items.push(ScopeItem::from_column_name(column_name));
3976 }
3977 }
3978
3979 if alias_item_name.is_some() {
3984 let new_item_col = both_scope.items.len() + new_items.len();
3985 join_cols.push(new_item_col);
3986 hidden_cols.push(new_item_col);
3987
3988 new_items.push(ScopeItem::from_name(
3989 alias_item_name.clone(),
3990 column_name.clone().to_string(),
3991 ));
3992
3993 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3997 }
3998
3999 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4000 }
4001 both_scope.items.extend(new_items);
4002
4003 for c in hidden_cols {
4007 both_scope.items[c].allow_unqualified_references = false;
4008 }
4009
4010 let project_key = join_cols
4012 .into_iter()
4013 .chain(0..both_scope.items.len())
4014 .unique()
4015 .collect::<Vec<_>>();
4016
4017 both_scope = both_scope.project(&project_key);
4018
4019 let on = HirScalarExpr::variadic_and(join_exprs);
4020
4021 let both = left
4022 .join(right, on, kind)
4023 .map(map_exprs)
4024 .project(project_key);
4025 Ok((both, both_scope))
4026}
4027
4028pub fn plan_expr<'a>(
4029 ecx: &'a ExprContext,
4030 e: &Expr<Aug>,
4031) -> Result<CoercibleScalarExpr, PlanError> {
4032 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4033}
4034
4035fn plan_expr_inner<'a>(
4036 ecx: &'a ExprContext,
4037 e: &Expr<Aug>,
4038) -> Result<CoercibleScalarExpr, PlanError> {
4039 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4040 return Ok(HirScalarExpr::named_column(
4042 i,
4043 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4044 )
4045 .into());
4046 }
4047
4048 match e {
4049 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4051 Ok(plan_identifier(ecx, names)?.into())
4052 }
4053
4054 Expr::Value(val) => plan_literal(val),
4056 Expr::Parameter(n) => plan_parameter(ecx, *n),
4057 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4058 Expr::List(exprs) => plan_list(ecx, exprs, None),
4059 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4060 Expr::Row { exprs } => plan_row(ecx, exprs),
4061
4062 Expr::Op { op, expr1, expr2 } => {
4064 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4065 }
4066 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4067 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4068
4069 Expr::Not { expr } => plan_not(ecx, expr),
4071 Expr::And { left, right } => plan_and(ecx, left, right),
4072 Expr::Or { left, right } => plan_or(ecx, left, right),
4073 Expr::IsExpr {
4074 expr,
4075 construct,
4076 negated,
4077 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4078 Expr::Case {
4079 operand,
4080 conditions,
4081 results,
4082 else_result,
4083 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4084 Expr::HomogenizingFunction { function, exprs } => {
4085 plan_homogenizing_function(ecx, function, exprs)
4086 }
4087 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4088 ecx,
4089 &None,
4090 &[l_expr.clone().equals(*r_expr.clone())],
4091 &[Expr::null()],
4092 &Some(Box::new(*l_expr.clone())),
4093 )?
4094 .into()),
4095 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4096 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4097 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4098 Expr::Like {
4099 expr,
4100 pattern,
4101 escape,
4102 case_insensitive,
4103 negated,
4104 } => Ok(plan_like(
4105 ecx,
4106 expr,
4107 pattern,
4108 escape.as_deref(),
4109 *case_insensitive,
4110 *negated,
4111 )?
4112 .into()),
4113
4114 Expr::InList {
4115 expr,
4116 list,
4117 negated,
4118 } => plan_in_list(ecx, expr, list, negated),
4119
4120 Expr::Exists(query) => plan_exists(ecx, query),
4122 Expr::Subquery(query) => plan_subquery(ecx, query),
4123 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4124 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4125 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4126 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4127 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4128 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4129 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4130 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4131 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4132 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4133 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4134 }
4135}
4136
4137fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4138 if !ecx.allow_parameters {
4139 return Err(PlanError::UnknownParameter(n));
4143 }
4144 if n == 0 || n > 65536 {
4145 return Err(PlanError::UnknownParameter(n));
4146 }
4147 if ecx.param_types().borrow().contains_key(&n) {
4148 Ok(HirScalarExpr::parameter(n).into())
4149 } else {
4150 Ok(CoercibleScalarExpr::Parameter(n))
4151 }
4152}
4153
4154fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4155 let mut out = vec![];
4156 for e in exprs {
4157 out.push(plan_expr(ecx, e)?);
4158 }
4159 Ok(CoercibleScalarExpr::LiteralRecord(out))
4160}
4161
4162fn plan_cast(
4163 ecx: &ExprContext,
4164 expr: &Expr<Aug>,
4165 data_type: &ResolvedDataType,
4166) -> Result<CoercibleScalarExpr, PlanError> {
4167 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4168 let expr = match expr {
4169 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4178 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4179 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4180 _ => plan_expr(ecx, expr)?,
4181 };
4182 let ecx = &ecx.with_name("CAST");
4183 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4184 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4185 Ok(expr.into())
4186}
4187
4188fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4189 let ecx = ecx.with_name("NOT argument");
4190 Ok(plan_expr(&ecx, expr)?
4191 .type_as(&ecx, &SqlScalarType::Bool)?
4192 .call_unary(UnaryFunc::Not(expr_func::Not))
4193 .into())
4194}
4195
4196fn plan_and(
4197 ecx: &ExprContext,
4198 left: &Expr<Aug>,
4199 right: &Expr<Aug>,
4200) -> Result<CoercibleScalarExpr, PlanError> {
4201 let ecx = ecx.with_name("AND argument");
4202 Ok(HirScalarExpr::variadic_and(vec![
4203 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4204 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4205 ])
4206 .into())
4207}
4208
4209fn plan_or(
4210 ecx: &ExprContext,
4211 left: &Expr<Aug>,
4212 right: &Expr<Aug>,
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214 let ecx = ecx.with_name("OR argument");
4215 Ok(HirScalarExpr::variadic_or(vec![
4216 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4218 ])
4219 .into())
4220}
4221
4222fn plan_in_list(
4223 ecx: &ExprContext,
4224 lhs: &Expr<Aug>,
4225 list: &Vec<Expr<Aug>>,
4226 negated: &bool,
4227) -> Result<CoercibleScalarExpr, PlanError> {
4228 let ecx = ecx.with_name("IN list");
4229 let or = HirScalarExpr::variadic_or(
4230 list.into_iter()
4231 .map(|e| {
4232 let eq = lhs.clone().equals(e.clone());
4233 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4234 })
4235 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4236 );
4237 Ok(if *negated {
4238 or.call_unary(UnaryFunc::Not(expr_func::Not))
4239 } else {
4240 or
4241 }
4242 .into())
4243}
4244
4245fn plan_homogenizing_function(
4246 ecx: &ExprContext,
4247 function: &HomogenizingFunction,
4248 exprs: &[Expr<Aug>],
4249) -> Result<CoercibleScalarExpr, PlanError> {
4250 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4252 match function {
4253 HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4254 HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4255 HomogenizingFunction::Least => VariadicFunc::from(Least),
4256 },
4257 coerce_homogeneous_exprs(
4258 &ecx.with_name(&function.to_string().to_lowercase()),
4259 plan_exprs(ecx, exprs)?,
4260 None,
4261 )?,
4262 );
4263 Ok(expr.into())
4264}
4265
4266fn plan_field_access(
4267 ecx: &ExprContext,
4268 expr: &Expr<Aug>,
4269 field: &Ident,
4270) -> Result<CoercibleScalarExpr, PlanError> {
4271 let field = normalize::column_name(field.clone());
4272 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4273 let ty = ecx.scalar_type(&expr);
4274 let i = match &ty {
4275 SqlScalarType::Record { fields, .. } => {
4276 fields.iter().position(|(name, _ty)| *name == field)
4277 }
4278 ty => sql_bail!(
4279 "column notation applied to type {}, which is not a composite type",
4280 ecx.humanize_scalar_type(ty, false)
4281 ),
4282 };
4283 match i {
4284 None => sql_bail!(
4285 "field {} not found in data type {}",
4286 field,
4287 ecx.humanize_scalar_type(&ty, false)
4288 ),
4289 Some(i) => Ok(expr
4290 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4291 .into()),
4292 }
4293}
4294
4295fn plan_subscript(
4296 ecx: &ExprContext,
4297 expr: &Expr<Aug>,
4298 positions: &[SubscriptPosition<Aug>],
4299) -> Result<CoercibleScalarExpr, PlanError> {
4300 assert!(
4301 !positions.is_empty(),
4302 "subscript expression must contain at least one position"
4303 );
4304
4305 let ecx = &ecx.with_name("subscripting");
4306 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4307 let ty = ecx.scalar_type(&expr);
4308 match &ty {
4309 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4310 ecx,
4311 expr,
4312 positions,
4313 if ty == SqlScalarType::Int2Vector {
4317 1
4318 } else {
4319 0
4320 },
4321 ),
4322 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4323 SqlScalarType::List { element_type, .. } => {
4324 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4326 let n_layers = ty.unwrap_list_n_layers();
4327 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4328 }
4329 ty => sql_bail!(
4330 "cannot subscript type {}",
4331 ecx.humanize_scalar_type(ty, false)
4332 ),
4333 }
4334}
4335
4336fn extract_scalar_subscript_from_positions<'a>(
4340 positions: &'a [SubscriptPosition<Aug>],
4341 expr_type_name: &str,
4342) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4343 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4344 for p in positions {
4345 if p.explicit_slice {
4346 sql_bail!("{} subscript does not support slices", expr_type_name);
4347 }
4348 assert!(
4349 p.end.is_none(),
4350 "index-appearing subscripts cannot have end value"
4351 );
4352 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4353 }
4354 Ok(scalar_subscripts)
4355}
4356
4357fn plan_subscript_array(
4358 ecx: &ExprContext,
4359 expr: HirScalarExpr,
4360 positions: &[SubscriptPosition<Aug>],
4361 offset: i64,
4362) -> Result<CoercibleScalarExpr, PlanError> {
4363 let mut exprs = Vec::with_capacity(positions.len() + 1);
4364 exprs.push(expr);
4365
4366 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4369
4370 for i in indexes {
4371 exprs.push(plan_expr(ecx, i)?.cast_to(
4372 ecx,
4373 CastContext::Explicit,
4374 &SqlScalarType::Int64,
4375 )?);
4376 }
4377
4378 Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4379}
4380
4381fn plan_subscript_list(
4382 ecx: &ExprContext,
4383 mut expr: HirScalarExpr,
4384 positions: &[SubscriptPosition<Aug>],
4385 mut remaining_layers: usize,
4386 elem_type_name: &str,
4387) -> Result<CoercibleScalarExpr, PlanError> {
4388 let mut i = 0;
4389
4390 while i < positions.len() {
4391 let j = positions[i..]
4393 .iter()
4394 .position(|p| p.explicit_slice)
4395 .unwrap_or(positions.len() - i);
4396 if j != 0 {
4397 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4398 let (n, e) = plan_index_list(
4399 ecx,
4400 expr,
4401 indexes.as_slice(),
4402 remaining_layers,
4403 elem_type_name,
4404 )?;
4405 remaining_layers = n;
4406 expr = e;
4407 i += j;
4408 }
4409
4410 let j = positions[i..]
4412 .iter()
4413 .position(|p| !p.explicit_slice)
4414 .unwrap_or(positions.len() - i);
4415 if j != 0 {
4416 expr = plan_slice_list(
4417 ecx,
4418 expr,
4419 &positions[i..i + j],
4420 remaining_layers,
4421 elem_type_name,
4422 )?;
4423 i += j;
4424 }
4425 }
4426
4427 Ok(expr.into())
4428}
4429
4430fn plan_index_list(
4431 ecx: &ExprContext,
4432 expr: HirScalarExpr,
4433 indexes: &[&Expr<Aug>],
4434 n_layers: usize,
4435 elem_type_name: &str,
4436) -> Result<(usize, HirScalarExpr), PlanError> {
4437 let depth = indexes.len();
4438
4439 if depth > n_layers {
4440 if n_layers == 0 {
4441 sql_bail!("cannot subscript type {}", elem_type_name)
4442 } else {
4443 sql_bail!(
4444 "cannot index into {} layers; list only has {} layer{}",
4445 depth,
4446 n_layers,
4447 if n_layers == 1 { "" } else { "s" }
4448 )
4449 }
4450 }
4451
4452 let mut exprs = Vec::with_capacity(depth + 1);
4453 exprs.push(expr);
4454
4455 for i in indexes {
4456 exprs.push(plan_expr(ecx, i)?.cast_to(
4457 ecx,
4458 CastContext::Explicit,
4459 &SqlScalarType::Int64,
4460 )?);
4461 }
4462
4463 Ok((
4464 n_layers - depth,
4465 HirScalarExpr::call_variadic(ListIndex, exprs),
4466 ))
4467}
4468
4469fn plan_slice_list(
4470 ecx: &ExprContext,
4471 expr: HirScalarExpr,
4472 slices: &[SubscriptPosition<Aug>],
4473 n_layers: usize,
4474 elem_type_name: &str,
4475) -> Result<HirScalarExpr, PlanError> {
4476 if n_layers == 0 {
4477 sql_bail!("cannot subscript type {}", elem_type_name)
4478 }
4479
4480 let mut exprs = Vec::with_capacity(slices.len() + 1);
4482 exprs.push(expr);
4483 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4485 Ok(match position {
4486 Some(p) => {
4487 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4488 }
4489 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4490 })
4491 };
4492 for p in slices {
4493 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4494 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4495 exprs.push(start);
4496 exprs.push(end);
4497 }
4498
4499 Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4500}
4501
4502fn plan_like(
4503 ecx: &ExprContext,
4504 expr: &Expr<Aug>,
4505 pattern: &Expr<Aug>,
4506 escape: Option<&Expr<Aug>>,
4507 case_insensitive: bool,
4508 not: bool,
4509) -> Result<HirScalarExpr, PlanError> {
4510 use CastContext::Implicit;
4511 let ecx = ecx.with_name("LIKE argument");
4512 let expr = plan_expr(&ecx, expr)?;
4513 let haystack = match ecx.scalar_type(&expr) {
4514 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4515 .type_as(&ecx, ty)?
4516 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4517 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4518 };
4519 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4520 if let Some(escape) = escape {
4521 pattern = pattern.call_binary(
4522 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4523 expr_func::LikeEscape,
4524 );
4525 }
4526 let func: BinaryFunc = if case_insensitive {
4527 expr_func::IsLikeMatchCaseInsensitive.into()
4528 } else {
4529 expr_func::IsLikeMatchCaseSensitive.into()
4530 };
4531 let like = haystack.call_binary(pattern, func);
4532 if not {
4533 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4534 } else {
4535 Ok(like)
4536 }
4537}
4538
4539fn plan_subscript_jsonb(
4540 ecx: &ExprContext,
4541 expr: HirScalarExpr,
4542 positions: &[SubscriptPosition<Aug>],
4543) -> Result<CoercibleScalarExpr, PlanError> {
4544 use CastContext::Implicit;
4545 use SqlScalarType::{Int64, String};
4546
4547 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4550
4551 let mut exprs = Vec::with_capacity(subscripts.len());
4552 for s in subscripts {
4553 let subscript = plan_expr(ecx, s)?;
4554 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4555 subscript
4556 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4557 typeconv::to_string(ecx, subscript)
4561 } else {
4562 sql_bail!("jsonb subscript type must be coercible to integer or text");
4563 };
4564 exprs.push(subscript);
4565 }
4566
4567 let expr = expr.call_binary(
4570 HirScalarExpr::call_variadic(
4571 ArrayCreate {
4572 elem_type: SqlScalarType::String,
4573 },
4574 exprs,
4575 ),
4576 expr_func::JsonbGetPath,
4577 );
4578 Ok(expr.into())
4579}
4580
4581fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4582 if !ecx.allow_subqueries {
4583 sql_bail!("{} does not allow subqueries", ecx.name)
4584 }
4585 let mut qcx = ecx.derived_query_context();
4586 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4587 Ok(expr.exists().into())
4588}
4589
4590fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4591 if !ecx.allow_subqueries {
4592 sql_bail!("{} does not allow subqueries", ecx.name)
4593 }
4594 let mut qcx = ecx.derived_query_context();
4595 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4596 let column_types = qcx.relation_type(&expr).column_types;
4597 if column_types.len() != 1 {
4598 sql_bail!(
4599 "Expected subselect to return 1 column, got {} columns",
4600 column_types.len()
4601 );
4602 }
4603 Ok(expr.select().into())
4604}
4605
4606fn plan_list_subquery(
4607 ecx: &ExprContext,
4608 query: &Query<Aug>,
4609) -> Result<CoercibleScalarExpr, PlanError> {
4610 plan_vector_like_subquery(
4611 ecx,
4612 query,
4613 |_| false,
4614 |elem_type| ListCreate { elem_type }.into(),
4615 |order_by| AggregateFunc::ListConcat { order_by },
4616 expr_func::ListListConcat.into(),
4617 |elem_type| {
4618 HirScalarExpr::literal(
4619 Datum::empty_list(),
4620 SqlScalarType::List {
4621 element_type: Box::new(elem_type),
4622 custom_id: None,
4623 },
4624 )
4625 },
4626 "list",
4627 )
4628}
4629
4630fn plan_array_subquery(
4631 ecx: &ExprContext,
4632 query: &Query<Aug>,
4633) -> Result<CoercibleScalarExpr, PlanError> {
4634 plan_vector_like_subquery(
4635 ecx,
4636 query,
4637 |elem_type| {
4638 matches!(
4639 elem_type,
4640 SqlScalarType::Char { .. }
4641 | SqlScalarType::Array { .. }
4642 | SqlScalarType::List { .. }
4643 | SqlScalarType::Map { .. }
4644 )
4645 },
4646 |elem_type| ArrayCreate { elem_type }.into(),
4647 |order_by| AggregateFunc::ArrayConcat { order_by },
4648 expr_func::ArrayArrayConcat.into(),
4649 |elem_type| {
4650 HirScalarExpr::literal(
4651 Datum::empty_array(),
4652 SqlScalarType::Array(Box::new(elem_type)),
4653 )
4654 },
4655 "[]",
4656 )
4657}
4658
4659fn plan_vector_like_subquery<F1, F2, F3, F4>(
4661 ecx: &ExprContext,
4662 query: &Query<Aug>,
4663 is_unsupported_type: F1,
4664 vector_create: F2,
4665 aggregate_concat: F3,
4666 binary_concat: BinaryFunc,
4667 empty_literal: F4,
4668 vector_type_string: &str,
4669) -> Result<CoercibleScalarExpr, PlanError>
4670where
4671 F1: Fn(&SqlScalarType) -> bool,
4672 F2: Fn(SqlScalarType) -> VariadicFunc,
4673 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4674 F4: Fn(SqlScalarType) -> HirScalarExpr,
4675{
4676 if !ecx.allow_subqueries {
4677 sql_bail!("{} does not allow subqueries", ecx.name)
4678 }
4679
4680 let mut qcx = ecx.derived_query_context();
4681 let mut planned_query = plan_query(&mut qcx, query)?;
4682 if planned_query.limit.is_some()
4683 || !planned_query
4684 .offset
4685 .clone()
4686 .try_into_literal_int64()
4687 .is_ok_and(|offset| offset == 0)
4688 {
4689 planned_query.expr = HirRelationExpr::top_k(
4690 planned_query.expr,
4691 vec![],
4692 planned_query.order_by.clone(),
4693 planned_query.limit,
4694 planned_query.offset,
4695 planned_query.group_size_hints.limit_input_group_size,
4696 );
4697 }
4698
4699 if planned_query.project.len() != 1 {
4700 sql_bail!(
4701 "Expected subselect to return 1 column, got {} columns",
4702 planned_query.project.len()
4703 );
4704 }
4705
4706 let project_column = *planned_query.project.get(0).unwrap();
4707 let elem_type = qcx
4708 .relation_type(&planned_query.expr)
4709 .column_types
4710 .get(project_column)
4711 .cloned()
4712 .unwrap()
4713 .scalar_type();
4714
4715 if is_unsupported_type(&elem_type) {
4716 bail_unsupported!(format!(
4717 "cannot build array from subquery because return type {}{}",
4718 ecx.humanize_scalar_type(&elem_type, false),
4719 vector_type_string
4720 ));
4721 }
4722
4723 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4726 vector_create(elem_type.clone()),
4727 vec![HirScalarExpr::column(project_column)],
4728 ))
4729 .chain(
4730 planned_query
4731 .order_by
4732 .iter()
4733 .map(|co| HirScalarExpr::column(co.column)),
4734 )
4735 .collect();
4736
4737 let aggregation_projection = vec![0];
4741 let aggregation_order_by = planned_query
4742 .order_by
4743 .into_iter()
4744 .enumerate()
4745 .map(|(i, order)| ColumnOrder { column: i, ..order })
4746 .collect();
4747
4748 let reduced_expr = planned_query
4749 .expr
4750 .reduce(
4751 vec![],
4752 vec![AggregateExpr {
4753 func: aggregate_concat(aggregation_order_by),
4754 expr: Box::new(HirScalarExpr::call_variadic(
4755 RecordCreate {
4756 field_names: iter::repeat(ColumnName::from(""))
4757 .take(aggregation_exprs.len())
4758 .collect(),
4759 },
4760 aggregation_exprs,
4761 )),
4762 distinct: false,
4763 }],
4764 None,
4765 )
4766 .project(aggregation_projection);
4767
4768 Ok(reduced_expr
4770 .select()
4771 .call_binary(empty_literal(elem_type), binary_concat)
4772 .into())
4773}
4774
4775fn plan_map_subquery(
4776 ecx: &ExprContext,
4777 query: &Query<Aug>,
4778) -> Result<CoercibleScalarExpr, PlanError> {
4779 if !ecx.allow_subqueries {
4780 sql_bail!("{} does not allow subqueries", ecx.name)
4781 }
4782
4783 let mut qcx = ecx.derived_query_context();
4784 let mut query = plan_query(&mut qcx, query)?;
4785 if query.limit.is_some()
4786 || !query
4787 .offset
4788 .clone()
4789 .try_into_literal_int64()
4790 .is_ok_and(|offset| offset == 0)
4791 {
4792 query.expr = HirRelationExpr::top_k(
4793 query.expr,
4794 vec![],
4795 query.order_by.clone(),
4796 query.limit,
4797 query.offset,
4798 query.group_size_hints.limit_input_group_size,
4799 );
4800 }
4801 if query.project.len() != 2 {
4802 sql_bail!(
4803 "expected map subquery to return 2 columns, got {} columns",
4804 query.project.len()
4805 );
4806 }
4807
4808 let query_types = qcx.relation_type(&query.expr).column_types;
4809 let key_column = query.project[0];
4810 let key_type = query_types[key_column].clone().scalar_type();
4811 let value_column = query.project[1];
4812 let value_type = query_types[value_column].clone().scalar_type();
4813
4814 if key_type != SqlScalarType::String {
4815 sql_bail!("cannot build map from subquery because first column is not of type text");
4816 }
4817
4818 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4819 RecordCreate {
4820 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4821 },
4822 vec![
4823 HirScalarExpr::column(key_column),
4824 HirScalarExpr::column(value_column),
4825 ],
4826 ))
4827 .chain(
4828 query
4829 .order_by
4830 .iter()
4831 .map(|co| HirScalarExpr::column(co.column)),
4832 )
4833 .collect();
4834
4835 let expr = query
4836 .expr
4837 .reduce(
4838 vec![],
4839 vec![AggregateExpr {
4840 func: AggregateFunc::MapAgg {
4841 order_by: query
4842 .order_by
4843 .into_iter()
4844 .enumerate()
4845 .map(|(i, order)| ColumnOrder { column: i, ..order })
4846 .collect(),
4847 value_type: value_type.clone(),
4848 },
4849 expr: Box::new(HirScalarExpr::call_variadic(
4850 RecordCreate {
4851 field_names: iter::repeat(ColumnName::from(""))
4852 .take(aggregation_exprs.len())
4853 .collect(),
4854 },
4855 aggregation_exprs,
4856 )),
4857 distinct: false,
4858 }],
4859 None,
4860 )
4861 .project(vec![0]);
4862
4863 let expr = HirScalarExpr::call_variadic(
4865 Coalesce,
4866 vec![
4867 expr.select(),
4868 HirScalarExpr::literal(
4869 Datum::empty_map(),
4870 SqlScalarType::Map {
4871 value_type: Box::new(value_type),
4872 custom_id: None,
4873 },
4874 ),
4875 ],
4876 );
4877
4878 Ok(expr.into())
4879}
4880
4881fn plan_collate(
4882 ecx: &ExprContext,
4883 expr: &Expr<Aug>,
4884 collation: &UnresolvedItemName,
4885) -> Result<CoercibleScalarExpr, PlanError> {
4886 if collation.0.len() == 2
4887 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4888 && collation.0[1] == ident!("default")
4889 {
4890 plan_expr(ecx, expr)
4891 } else {
4892 bail_unsupported!("COLLATE");
4893 }
4894}
4895
4896fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4903where
4904 E: std::borrow::Borrow<Expr<Aug>>,
4905{
4906 let mut out = vec![];
4907 for expr in exprs {
4908 out.push(plan_expr(ecx, expr.borrow())?);
4909 }
4910 Ok(out)
4911}
4912
4913fn plan_array(
4915 ecx: &ExprContext,
4916 exprs: &[Expr<Aug>],
4917 type_hint: Option<&SqlScalarType>,
4918) -> Result<CoercibleScalarExpr, PlanError> {
4919 let mut out = vec![];
4921 for expr in exprs {
4922 out.push(match expr {
4923 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4926 _ => plan_expr(ecx, expr)?,
4927 });
4928 }
4929
4930 let type_hint = match type_hint {
4932 Some(SqlScalarType::Array(elem_type)) => {
4937 let multidimensional = out.iter().any(|e| {
4938 matches!(
4939 ecx.scalar_type(e),
4940 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4941 )
4942 });
4943 if multidimensional {
4944 type_hint
4945 } else {
4946 Some(&**elem_type)
4947 }
4948 }
4949 Some(_) => None,
4953 None => None,
4955 };
4956
4957 let (elem_type, exprs) = if exprs.is_empty() {
4959 if let Some(elem_type) = type_hint {
4960 (elem_type.clone(), vec![])
4961 } else {
4962 sql_bail!("cannot determine type of empty array");
4963 }
4964 } else {
4965 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4966 (ecx.scalar_type(&out[0]), out)
4967 };
4968
4969 if matches!(
4975 elem_type,
4976 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4977 ) {
4978 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4979 }
4980
4981 Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4982}
4983
4984fn plan_list(
4985 ecx: &ExprContext,
4986 exprs: &[Expr<Aug>],
4987 type_hint: Option<&SqlScalarType>,
4988) -> Result<CoercibleScalarExpr, PlanError> {
4989 let (elem_type, exprs) = if exprs.is_empty() {
4990 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4991 (element_type.without_modifiers(), vec![])
4992 } else {
4993 sql_bail!("cannot determine type of empty list");
4994 }
4995 } else {
4996 let type_hint = match type_hint {
4997 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4998 _ => None,
4999 };
5000
5001 let mut out = vec![];
5002 for expr in exprs {
5003 out.push(match expr {
5004 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5007 _ => plan_expr(ecx, expr)?,
5008 });
5009 }
5010 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5011 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5012 };
5013
5014 if matches!(elem_type, SqlScalarType::Char { .. }) {
5015 bail_unsupported!("char list");
5016 }
5017
5018 Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5019}
5020
5021fn plan_map(
5022 ecx: &ExprContext,
5023 entries: &[MapEntry<Aug>],
5024 type_hint: Option<&SqlScalarType>,
5025) -> Result<CoercibleScalarExpr, PlanError> {
5026 let (value_type, exprs) = if entries.is_empty() {
5027 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5028 (value_type.without_modifiers(), vec![])
5029 } else {
5030 sql_bail!("cannot determine type of empty map");
5031 }
5032 } else {
5033 let type_hint = match type_hint {
5034 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5035 _ => None,
5036 };
5037
5038 let mut keys = vec![];
5039 let mut values = vec![];
5040 for MapEntry { key, value } in entries {
5041 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5042 let value = match value {
5043 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5046 _ => plan_expr(ecx, value)?,
5047 };
5048 keys.push(key);
5049 values.push(value);
5050 }
5051 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5052 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5053 let out = itertools::interleave(keys, values).collect();
5054 (value_type, out)
5055 };
5056
5057 if matches!(value_type, SqlScalarType::Char { .. }) {
5058 bail_unsupported!("char map");
5059 }
5060
5061 let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5062 Ok(expr.into())
5063}
5064
5065pub fn coerce_homogeneous_exprs(
5082 ecx: &ExprContext,
5083 exprs: Vec<CoercibleScalarExpr>,
5084 force_type: Option<&SqlScalarType>,
5085) -> Result<Vec<HirScalarExpr>, PlanError> {
5086 assert!(!exprs.is_empty());
5087
5088 let target_holder;
5089 let target = match force_type {
5090 Some(t) => t,
5091 None => {
5092 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5093 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5094 &target_holder
5095 }
5096 };
5097
5098 let mut out = Vec::new();
5100 for expr in exprs {
5101 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5102 let ccx = match force_type {
5103 None => CastContext::Implicit,
5104 Some(_) => CastContext::Explicit,
5105 };
5106 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5107 Ok(expr) => out.push(expr),
5108 Err(_) => sql_bail!(
5109 "{} could not convert type {} to {}",
5110 ecx.name,
5111 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5112 ecx.humanize_scalar_type(target, false),
5113 ),
5114 }
5115 }
5116 Ok(out)
5117}
5118
5119pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5122 obe: &OrderByExpr<T>,
5123 column: usize,
5124) -> ColumnOrder {
5125 let desc = !obe.asc.unwrap_or(true);
5126 ColumnOrder {
5127 column,
5128 desc,
5129 nulls_last: obe.nulls_last.unwrap_or(!desc),
5132 }
5133}
5134
5135fn plan_function_order_by(
5143 ecx: &ExprContext,
5144 order_by: &[OrderByExpr<Aug>],
5145) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5146 let mut order_by_exprs = vec![];
5147 let mut col_orders = vec![];
5148 {
5149 for (i, obe) in order_by.iter().enumerate() {
5150 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5154 order_by_exprs.push(expr);
5155 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5156 }
5157 }
5158 Ok((order_by_exprs, col_orders))
5159}
5160
5161fn plan_aggregate_common(
5163 ecx: &ExprContext,
5164 Function::<Aug> {
5165 name,
5166 args,
5167 filter,
5168 over: _,
5169 distinct,
5170 }: &Function<Aug>,
5171) -> Result<AggregateExpr, PlanError> {
5172 let impls = match resolve_func(ecx, name, args)? {
5187 Func::Aggregate(impls) => impls,
5188 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5189 };
5190
5191 let (args, order_by) = match &args {
5200 FunctionArgs::Star => (vec![], vec![]),
5201 FunctionArgs::Args { args, order_by } => {
5202 if args.is_empty() {
5203 sql_bail!(
5204 "{}(*) must be used to call a parameterless aggregate function",
5205 ecx.qcx
5206 .scx
5207 .humanize_resolved_name(name)
5208 .expect("name actually resolved")
5209 );
5210 }
5211 let args = plan_exprs(ecx, args)?;
5212 (args, order_by.clone())
5213 }
5214 };
5215
5216 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5217
5218 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5219 if let Some(filter) = &filter {
5220 let cond =
5230 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5231 let expr_typ = ecx.scalar_type(&expr);
5232 expr = HirScalarExpr::if_then_else(
5233 cond,
5234 expr,
5235 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5236 );
5237 }
5238
5239 let mut seen_outer = false;
5240 let mut seen_inner = false;
5241 #[allow(deprecated)]
5242 expr.visit_columns(0, &mut |depth, col| {
5243 if depth == 0 && col.level == 0 {
5244 seen_inner = true;
5245 } else if col.level > depth {
5246 seen_outer = true;
5247 }
5248 });
5249 if seen_outer && !seen_inner {
5250 bail_unsupported!(
5251 3720,
5252 "aggregate functions that refer exclusively to outer columns"
5253 );
5254 }
5255
5256 if func.is_order_sensitive() {
5259 let field_names = iter::repeat(ColumnName::from(""))
5260 .take(1 + order_by_exprs.len())
5261 .collect();
5262 let mut exprs = vec![expr];
5263 exprs.extend(order_by_exprs);
5264 expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5265 }
5266
5267 Ok(AggregateExpr {
5268 func,
5269 expr: Box::new(expr),
5270 distinct: *distinct,
5271 })
5272}
5273
5274fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5275 let mut names = names.to_vec();
5276 let col_name = normalize::column_name(names.pop().unwrap());
5277
5278 if !names.is_empty() {
5280 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5281 let (i, i_name) = ecx.scope.resolve_table_column(
5282 &ecx.qcx.outer_scopes,
5283 &table_name,
5284 &col_name,
5285 &mut ecx.qcx.name_manager.borrow_mut(),
5286 )?;
5287 return Ok(HirScalarExpr::named_column(i, i_name));
5288 }
5289
5290 let similar_names = match ecx.scope.resolve_column(
5293 &ecx.qcx.outer_scopes,
5294 &col_name,
5295 &mut ecx.qcx.name_manager.borrow_mut(),
5296 ) {
5297 Ok((i, i_name)) => {
5298 return Ok(HirScalarExpr::named_column(i, i_name));
5299 }
5300 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5301 Err(e) => return Err(e),
5302 };
5303
5304 let items = ecx.scope.items_from_table(
5307 &ecx.qcx.outer_scopes,
5308 &PartialItemName {
5309 database: None,
5310 schema: None,
5311 item: col_name.as_str().to_owned(),
5312 },
5313 )?;
5314 match items.as_slice() {
5315 [] => Err(PlanError::UnknownColumn {
5317 table: None,
5318 column: col_name,
5319 similar: similar_names,
5320 }),
5321 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5326 *column,
5327 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5328 )),
5329 _ => {
5332 let mut has_exists_column = None;
5333 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5334 .into_iter()
5335 .filter_map(|(column, item)| {
5336 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5337 has_exists_column = Some(column);
5338 None
5339 } else {
5340 let expr = HirScalarExpr::named_column(
5341 column,
5342 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5343 );
5344 let name = item.column_name.clone();
5345 Some((expr, name))
5346 }
5347 })
5348 .unzip();
5349 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5351 exprs.into_element()
5352 } else {
5353 HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5354 };
5355 if let Some(has_exists_column) = has_exists_column {
5356 Ok(HirScalarExpr::if_then_else(
5357 HirScalarExpr::unnamed_column(has_exists_column)
5358 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5359 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5360 expr,
5361 ))
5362 } else {
5363 Ok(expr)
5364 }
5365 }
5366 }
5367}
5368
5369fn plan_op(
5370 ecx: &ExprContext,
5371 op: &str,
5372 expr1: &Expr<Aug>,
5373 expr2: Option<&Expr<Aug>>,
5374) -> Result<HirScalarExpr, PlanError> {
5375 let impls = func::resolve_op(op)?;
5376 let args = match expr2 {
5377 None => plan_exprs(ecx, &[expr1])?,
5378 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5379 };
5380 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5381}
5382
5383fn plan_function<'a>(
5384 ecx: &ExprContext,
5385 f @ Function {
5386 name,
5387 args,
5388 filter,
5389 over,
5390 distinct,
5391 }: &'a Function<Aug>,
5392) -> Result<HirScalarExpr, PlanError> {
5393 let impls = match resolve_func(ecx, name, args)? {
5394 Func::Table(_) => {
5395 sql_bail!(
5396 "table functions are not allowed in {} (function {})",
5397 ecx.name,
5398 name
5399 );
5400 }
5401 Func::Scalar(impls) => {
5402 if over.is_some() {
5403 sql_bail!(
5404 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5405 );
5406 }
5407 impls
5408 }
5409 Func::ScalarWindow(impls) => {
5410 let (
5411 ignore_nulls,
5412 order_by_exprs,
5413 col_orders,
5414 _window_frame,
5415 partition_by,
5416 scalar_args,
5417 ) = plan_window_function_non_aggr(ecx, f)?;
5418
5419 if !scalar_args.is_empty() {
5423 if let ResolvedItemName::Item {
5424 full_name: FullItemName { item, .. },
5425 ..
5426 } = name
5427 {
5428 sql_bail!(
5429 "function {} has 0 parameters, but was called with {}",
5430 item,
5431 scalar_args.len()
5432 );
5433 }
5434 }
5435
5436 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5441
5442 if ignore_nulls {
5443 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5446 }
5447
5448 return Ok(HirScalarExpr::windowing(WindowExpr {
5449 func: WindowExprType::Scalar(ScalarWindowExpr {
5450 func,
5451 order_by: col_orders,
5452 }),
5453 partition_by,
5454 order_by: order_by_exprs,
5455 }));
5456 }
5457 Func::ValueWindow(impls) => {
5458 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5459 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5460 window_plan;
5461
5462 let (args_encoded, func) =
5463 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5464
5465 if ignore_nulls {
5466 match func {
5467 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5468 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5469 }
5470 }
5471
5472 return Ok(HirScalarExpr::windowing(WindowExpr {
5473 func: WindowExprType::Value(ValueWindowExpr {
5474 func,
5475 args: Box::new(args_encoded),
5476 order_by: col_orders,
5477 window_frame,
5478 ignore_nulls, }),
5480 partition_by,
5481 order_by: order_by_exprs,
5482 }));
5483 }
5484 Func::Aggregate(_) => {
5485 if f.over.is_none() {
5486 if ecx.allow_aggregates {
5488 sql_bail!(
5491 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5492 name,
5493 );
5494 } else {
5495 sql_bail!(
5498 "aggregate functions are not allowed in {} (function {})",
5499 ecx.name,
5500 name
5501 );
5502 }
5503 } else {
5504 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5505 plan_window_function_common(ecx, &f.name, &f.over)?;
5506
5507 match (&window_frame.start_bound, &window_frame.end_bound) {
5509 (
5510 mz_expr::WindowFrameBound::UnboundedPreceding,
5511 mz_expr::WindowFrameBound::OffsetPreceding(..),
5512 )
5513 | (
5514 mz_expr::WindowFrameBound::UnboundedPreceding,
5515 mz_expr::WindowFrameBound::OffsetFollowing(..),
5516 )
5517 | (
5518 mz_expr::WindowFrameBound::OffsetPreceding(..),
5519 mz_expr::WindowFrameBound::UnboundedFollowing,
5520 )
5521 | (
5522 mz_expr::WindowFrameBound::OffsetFollowing(..),
5523 mz_expr::WindowFrameBound::UnboundedFollowing,
5524 ) => bail_unsupported!("mixed unbounded - offset frames"),
5525 (_, _) => {} }
5527
5528 if ignore_nulls {
5529 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5533 }
5534
5535 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5536
5537 if aggregate_expr.distinct {
5538 bail_unsupported!("DISTINCT in window aggregates");
5540 }
5541
5542 return Ok(HirScalarExpr::windowing(WindowExpr {
5543 func: WindowExprType::Aggregate(AggregateWindowExpr {
5544 aggregate_expr,
5545 order_by: col_orders,
5546 window_frame,
5547 }),
5548 partition_by,
5549 order_by: order_by_exprs,
5550 }));
5551 }
5552 }
5553 };
5554
5555 if over.is_some() {
5556 unreachable!("If there is an OVER clause, we should have returned already above.");
5557 }
5558
5559 if *distinct {
5560 sql_bail!(
5561 "DISTINCT specified, but {} is not an aggregate function",
5562 ecx.qcx
5563 .scx
5564 .humanize_resolved_name(name)
5565 .expect("already resolved")
5566 );
5567 }
5568 if filter.is_some() {
5569 sql_bail!(
5570 "FILTER specified, but {} is not an aggregate function",
5571 ecx.qcx
5572 .scx
5573 .humanize_resolved_name(name)
5574 .expect("already resolved")
5575 );
5576 }
5577
5578 let scalar_args = match &args {
5579 FunctionArgs::Star => {
5580 sql_bail!(
5581 "* argument is invalid with non-aggregate function {}",
5582 ecx.qcx
5583 .scx
5584 .humanize_resolved_name(name)
5585 .expect("already resolved")
5586 )
5587 }
5588 FunctionArgs::Args { args, order_by } => {
5589 if !order_by.is_empty() {
5590 sql_bail!(
5591 "ORDER BY specified, but {} is not an aggregate function",
5592 ecx.qcx
5593 .scx
5594 .humanize_resolved_name(name)
5595 .expect("already resolved")
5596 );
5597 }
5598 plan_exprs(ecx, args)?
5599 }
5600 };
5601
5602 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5603}
5604
5605pub const IGNORE_NULLS_ERROR_MSG: &str =
5606 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5607
5608pub fn resolve_func(
5612 ecx: &ExprContext,
5613 name: &ResolvedItemName,
5614 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5615) -> Result<&'static Func, PlanError> {
5616 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5617 if let Ok(f) = i.func() {
5618 return Ok(f);
5619 }
5620 }
5621
5622 let cexprs = match args {
5625 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5626 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5627 if !order_by.is_empty() {
5628 sql_bail!(
5629 "ORDER BY specified, but {} is not an aggregate function",
5630 name
5631 );
5632 }
5633 plan_exprs(ecx, args)?
5634 }
5635 };
5636
5637 let arg_types: Vec<_> = cexprs
5638 .into_iter()
5639 .map(|ty| match ecx.scalar_type(&ty) {
5640 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5641 CoercibleScalarType::Record(_) => "record".to_string(),
5642 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5643 })
5644 .collect();
5645
5646 Err(PlanError::UnknownFunction {
5647 name: name.to_string(),
5648 arg_types,
5649 })
5650}
5651
5652fn plan_is_expr<'a>(
5653 ecx: &ExprContext,
5654 expr: &'a Expr<Aug>,
5655 construct: &IsExprConstruct<Aug>,
5656 not: bool,
5657) -> Result<HirScalarExpr, PlanError> {
5658 let expr_hir = plan_expr(ecx, expr)?;
5659
5660 let mut result = match construct {
5661 IsExprConstruct::Null => {
5662 expr_hir.type_as_any(ecx)?.call_is_null()
5667 }
5668 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5669 IsExprConstruct::True => expr_hir
5670 .type_as(ecx, &SqlScalarType::Bool)?
5671 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5672 IsExprConstruct::False => expr_hir
5673 .type_as(ecx, &SqlScalarType::Bool)?
5674 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5675 IsExprConstruct::DistinctFrom(expr2) => {
5676 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5687 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5688
5689 let expr1_hir = expr_hir.type_as_any(ecx)?;
5690 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5691
5692 let term1 = HirScalarExpr::variadic_or(vec![
5693 ne_hir,
5694 expr1_hir.clone().call_is_null(),
5695 expr2_hir.clone().call_is_null(),
5696 ]);
5697 let term2 = HirScalarExpr::variadic_or(vec![
5698 expr1_hir.call_is_null().not(),
5699 expr2_hir.call_is_null().not(),
5700 ]);
5701 term1.and(term2)
5702 }
5703 };
5704 if not {
5705 result = result.not();
5706 }
5707 Ok(result)
5708}
5709
5710fn plan_case<'a>(
5711 ecx: &ExprContext,
5712 operand: &'a Option<Box<Expr<Aug>>>,
5713 conditions: &'a [Expr<Aug>],
5714 results: &'a [Expr<Aug>],
5715 else_result: &'a Option<Box<Expr<Aug>>>,
5716) -> Result<HirScalarExpr, PlanError> {
5717 let mut cond_exprs = Vec::new();
5718 let mut result_exprs = Vec::new();
5719 for (c, r) in conditions.iter().zip_eq(results) {
5720 let c = match operand {
5721 Some(operand) => operand.clone().equals(c.clone()),
5722 None => c.clone(),
5723 };
5724 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5725 cond_exprs.push(cexpr);
5726 result_exprs.push(r);
5727 }
5728 result_exprs.push(match else_result {
5729 Some(else_result) => else_result,
5730 None => &Expr::Value(Value::Null),
5731 });
5732 let mut result_exprs = coerce_homogeneous_exprs(
5733 &ecx.with_name("CASE"),
5734 plan_exprs(ecx, &result_exprs)?,
5735 None,
5736 )?;
5737 let mut expr = result_exprs.pop().unwrap();
5738 assert_eq!(cond_exprs.len(), result_exprs.len());
5739 for (cexpr, rexpr) in cond_exprs
5740 .into_iter()
5741 .rev()
5742 .zip_eq(result_exprs.into_iter().rev())
5743 {
5744 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5745 }
5746 Ok(expr)
5747}
5748
5749fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5750 let (datum, scalar_type) = match l {
5751 Value::Number(s) => {
5752 let d = strconv::parse_numeric(s.as_str())?;
5753 if !s.contains(&['E', '.'][..]) {
5754 if let Ok(n) = d.0.try_into() {
5756 (Datum::Int32(n), SqlScalarType::Int32)
5757 } else if let Ok(n) = d.0.try_into() {
5758 (Datum::Int64(n), SqlScalarType::Int64)
5759 } else {
5760 (
5761 Datum::Numeric(d),
5762 SqlScalarType::Numeric { max_scale: None },
5763 )
5764 }
5765 } else {
5766 (
5767 Datum::Numeric(d),
5768 SqlScalarType::Numeric { max_scale: None },
5769 )
5770 }
5771 }
5772 Value::HexString(_) => bail_unsupported!("hex string literals"),
5773 Value::Boolean(b) => match b {
5774 false => (Datum::False, SqlScalarType::Bool),
5775 true => (Datum::True, SqlScalarType::Bool),
5776 },
5777 Value::Interval(i) => {
5778 let i = literal::plan_interval(i)?;
5779 (Datum::Interval(i), SqlScalarType::Interval)
5780 }
5781 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5782 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5783 };
5784 let expr = HirScalarExpr::literal(datum, scalar_type);
5785 Ok(expr.into())
5786}
5787
5788fn plan_window_function_non_aggr<'a>(
5791 ecx: &ExprContext,
5792 Function {
5793 name,
5794 args,
5795 filter,
5796 over,
5797 distinct,
5798 }: &'a Function<Aug>,
5799) -> Result<
5800 (
5801 bool,
5802 Vec<HirScalarExpr>,
5803 Vec<ColumnOrder>,
5804 mz_expr::WindowFrame,
5805 Vec<HirScalarExpr>,
5806 Vec<CoercibleScalarExpr>,
5807 ),
5808 PlanError,
5809> {
5810 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5811 plan_window_function_common(ecx, name, over)?;
5812
5813 if *distinct {
5814 sql_bail!(
5815 "DISTINCT specified, but {} is not an aggregate function",
5816 name
5817 );
5818 }
5819
5820 if filter.is_some() {
5821 bail_unsupported!("FILTER in non-aggregate window functions");
5822 }
5823
5824 let scalar_args = match &args {
5825 FunctionArgs::Star => {
5826 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5827 }
5828 FunctionArgs::Args { args, order_by } => {
5829 if !order_by.is_empty() {
5830 sql_bail!(
5831 "ORDER BY specified, but {} is not an aggregate function",
5832 name
5833 );
5834 }
5835 plan_exprs(ecx, args)?
5836 }
5837 };
5838
5839 Ok((
5840 ignore_nulls,
5841 order_by_exprs,
5842 col_orders,
5843 window_frame,
5844 partition,
5845 scalar_args,
5846 ))
5847}
5848
5849fn plan_window_function_common(
5851 ecx: &ExprContext,
5852 name: &<Aug as AstInfo>::ItemName,
5853 over: &Option<WindowSpec<Aug>>,
5854) -> Result<
5855 (
5856 bool,
5857 Vec<HirScalarExpr>,
5858 Vec<ColumnOrder>,
5859 mz_expr::WindowFrame,
5860 Vec<HirScalarExpr>,
5861 ),
5862 PlanError,
5863> {
5864 if !ecx.allow_windows {
5865 sql_bail!(
5866 "window functions are not allowed in {} (function {})",
5867 ecx.name,
5868 name
5869 );
5870 }
5871
5872 let window_spec = match over.as_ref() {
5873 Some(over) => over,
5874 None => sql_bail!("window function {} requires an OVER clause", name),
5875 };
5876 if window_spec.ignore_nulls && window_spec.respect_nulls {
5877 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5878 }
5879 let window_frame = match window_spec.window_frame.as_ref() {
5880 Some(frame) => plan_window_frame(frame)?,
5881 None => mz_expr::WindowFrame::default(),
5882 };
5883 let mut partition = Vec::new();
5884 for expr in &window_spec.partition_by {
5885 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5886 }
5887
5888 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5889
5890 Ok((
5891 window_spec.ignore_nulls,
5892 order_by_exprs,
5893 col_orders,
5894 window_frame,
5895 partition,
5896 ))
5897}
5898
5899fn plan_window_frame(
5900 WindowFrame {
5901 units,
5902 start_bound,
5903 end_bound,
5904 }: &WindowFrame,
5905) -> Result<mz_expr::WindowFrame, PlanError> {
5906 use mz_expr::WindowFrameBound::*;
5907 let units = window_frame_unit_ast_to_expr(units)?;
5908 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5909 let end_bound = end_bound
5910 .as_ref()
5911 .map(window_frame_bound_ast_to_expr)
5912 .unwrap_or(CurrentRow);
5913
5914 match (&start_bound, &end_bound) {
5916 (UnboundedFollowing, _) => {
5918 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5919 }
5920 (_, UnboundedPreceding) => {
5922 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5923 }
5924 (CurrentRow, OffsetPreceding(_)) => {
5926 sql_bail!("frame starting from current row cannot have preceding rows")
5927 }
5928 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5929 sql_bail!("frame starting from following row cannot have preceding rows")
5930 }
5931 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5934 if *o1 > 1000000 || *o2 > 1000000 {
5938 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5939 }
5940 }
5941 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5942 if *o1 > 1000000 || *o2 > 1000000 {
5943 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5944 }
5945 }
5946 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5947 if *o1 > 1000000 || *o2 > 1000000 {
5948 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5949 }
5950 }
5951 (OffsetPreceding(o), CurrentRow) => {
5952 if *o > 1000000 {
5953 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5954 }
5955 }
5956 (CurrentRow, OffsetFollowing(o)) => {
5957 if *o > 1000000 {
5958 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5959 }
5960 }
5961 (_, _) => (),
5963 }
5964
5965 if units == mz_expr::WindowFrameUnits::Range
5968 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5969 {
5970 bail_unsupported!("RANGE in non-default window frames")
5971 }
5972
5973 let frame = mz_expr::WindowFrame {
5974 units,
5975 start_bound,
5976 end_bound,
5977 };
5978 Ok(frame)
5979}
5980
5981fn window_frame_unit_ast_to_expr(
5982 unit: &WindowFrameUnits,
5983) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5984 match unit {
5985 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5986 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5987 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5988 }
5989}
5990
5991fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5992 match bound {
5993 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5994 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5995 WindowFrameBound::Preceding(Some(offset)) => {
5996 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5997 }
5998 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5999 WindowFrameBound::Following(Some(offset)) => {
6000 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6001 }
6002 }
6003}
6004
6005pub fn scalar_type_from_sql(
6006 scx: &StatementContext,
6007 data_type: &ResolvedDataType,
6008) -> Result<SqlScalarType, PlanError> {
6009 match data_type {
6010 ResolvedDataType::AnonymousList(elem_type) => {
6011 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6012 if matches!(elem_type, SqlScalarType::Char { .. }) {
6013 bail_unsupported!("char list");
6014 }
6015 Ok(SqlScalarType::List {
6016 element_type: Box::new(elem_type),
6017 custom_id: None,
6018 })
6019 }
6020 ResolvedDataType::AnonymousMap {
6021 key_type,
6022 value_type,
6023 } => {
6024 match scalar_type_from_sql(scx, key_type)? {
6025 SqlScalarType::String => {}
6026 other => sql_bail!(
6027 "map key type must be {}, got {}",
6028 scx.humanize_scalar_type(&SqlScalarType::String, false),
6029 scx.humanize_scalar_type(&other, false)
6030 ),
6031 }
6032 Ok(SqlScalarType::Map {
6033 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6034 custom_id: None,
6035 })
6036 }
6037 ResolvedDataType::Named { id, modifiers, .. } => {
6038 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6039 }
6040 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6041 }
6042}
6043
6044pub fn scalar_type_from_catalog(
6045 catalog: &dyn SessionCatalog,
6046 id: CatalogItemId,
6047 modifiers: &[i64],
6048) -> Result<SqlScalarType, PlanError> {
6049 let entry = catalog.get_item(&id);
6050 let type_details = match entry.type_details() {
6051 Some(type_details) => type_details,
6052 None => {
6053 sql_bail!(
6056 "internal error: {} does not refer to a type",
6057 catalog.resolve_full_name(entry.name()).to_string().quoted()
6058 );
6059 }
6060 };
6061 match &type_details.typ {
6062 CatalogType::Numeric => {
6063 let mut modifiers = modifiers.iter().fuse();
6064 let precision = match modifiers.next() {
6065 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6066 sql_bail!(
6067 "precision for type numeric must be between 1 and {}",
6068 NUMERIC_DATUM_MAX_PRECISION,
6069 );
6070 }
6071 Some(p) => Some(*p),
6072 None => None,
6073 };
6074 let scale = match modifiers.next() {
6075 Some(scale) => {
6076 if let Some(precision) = precision {
6077 if *scale > precision {
6078 sql_bail!(
6079 "scale for type numeric must be between 0 and precision {}",
6080 precision
6081 );
6082 }
6083 }
6084 Some(NumericMaxScale::try_from(*scale)?)
6085 }
6086 None => None,
6087 };
6088 if modifiers.next().is_some() {
6089 sql_bail!("type numeric supports at most two type modifiers");
6090 }
6091 Ok(SqlScalarType::Numeric { max_scale: scale })
6092 }
6093 CatalogType::Char => {
6094 let mut modifiers = modifiers.iter().fuse();
6095 let length = match modifiers.next() {
6096 Some(l) => Some(CharLength::try_from(*l)?),
6097 None => Some(CharLength::ONE),
6098 };
6099 if modifiers.next().is_some() {
6100 sql_bail!("type character supports at most one type modifier");
6101 }
6102 Ok(SqlScalarType::Char { length })
6103 }
6104 CatalogType::VarChar => {
6105 let mut modifiers = modifiers.iter().fuse();
6106 let length = match modifiers.next() {
6107 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6108 None => None,
6109 };
6110 if modifiers.next().is_some() {
6111 sql_bail!("type character varying supports at most one type modifier");
6112 }
6113 Ok(SqlScalarType::VarChar { max_length: length })
6114 }
6115 CatalogType::Timestamp => {
6116 let mut modifiers = modifiers.iter().fuse();
6117 let precision = match modifiers.next() {
6118 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6119 None => None,
6120 };
6121 if modifiers.next().is_some() {
6122 sql_bail!("type timestamp supports at most one type modifier");
6123 }
6124 Ok(SqlScalarType::Timestamp { precision })
6125 }
6126 CatalogType::TimestampTz => {
6127 let mut modifiers = modifiers.iter().fuse();
6128 let precision = match modifiers.next() {
6129 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6130 None => None,
6131 };
6132 if modifiers.next().is_some() {
6133 sql_bail!("type timestamp with time zone supports at most one type modifier");
6134 }
6135 Ok(SqlScalarType::TimestampTz { precision })
6136 }
6137 t => {
6138 if !modifiers.is_empty() {
6139 sql_bail!(
6140 "{} does not support type modifiers",
6141 catalog.resolve_full_name(entry.name()).to_string()
6142 );
6143 }
6144 match t {
6145 CatalogType::Array {
6146 element_reference: element_id,
6147 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6148 catalog,
6149 *element_id,
6150 modifiers,
6151 )?))),
6152 CatalogType::List {
6153 element_reference: element_id,
6154 element_modifiers,
6155 } => Ok(SqlScalarType::List {
6156 element_type: Box::new(scalar_type_from_catalog(
6157 catalog,
6158 *element_id,
6159 element_modifiers,
6160 )?),
6161 custom_id: Some(id),
6162 }),
6163 CatalogType::Map {
6164 key_reference: _,
6165 key_modifiers: _,
6166 value_reference: value_id,
6167 value_modifiers,
6168 } => Ok(SqlScalarType::Map {
6169 value_type: Box::new(scalar_type_from_catalog(
6170 catalog,
6171 *value_id,
6172 value_modifiers,
6173 )?),
6174 custom_id: Some(id),
6175 }),
6176 CatalogType::Range {
6177 element_reference: element_id,
6178 } => Ok(SqlScalarType::Range {
6179 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6180 }),
6181 CatalogType::Record { fields } => {
6182 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6183 .iter()
6184 .map(|f| {
6185 let scalar_type = scalar_type_from_catalog(
6186 catalog,
6187 f.type_reference,
6188 &f.type_modifiers,
6189 )?;
6190 Ok((
6191 f.name.clone(),
6192 SqlColumnType {
6193 scalar_type,
6194 nullable: true,
6195 },
6196 ))
6197 })
6198 .collect::<Result<Box<_>, PlanError>>()?;
6199 Ok(SqlScalarType::Record {
6200 fields: scalars,
6201 custom_id: Some(id),
6202 })
6203 }
6204 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6205 CatalogType::Bool => Ok(SqlScalarType::Bool),
6206 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6207 CatalogType::Date => Ok(SqlScalarType::Date),
6208 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6209 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6210 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6211 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6212 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6213 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6214 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6215 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6216 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6217 CatalogType::Interval => Ok(SqlScalarType::Interval),
6218 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6219 CatalogType::Oid => Ok(SqlScalarType::Oid),
6220 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6221 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6222 CatalogType::Pseudo => {
6223 sql_bail!(
6224 "cannot reference pseudo type {}",
6225 catalog.resolve_full_name(entry.name()).to_string()
6226 )
6227 }
6228 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6229 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6230 CatalogType::RegType => Ok(SqlScalarType::RegType),
6231 CatalogType::String => Ok(SqlScalarType::String),
6232 CatalogType::Time => Ok(SqlScalarType::Time),
6233 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6234 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6235 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6236 CatalogType::Numeric => unreachable!("handled above"),
6237 CatalogType::Char => unreachable!("handled above"),
6238 CatalogType::VarChar => unreachable!("handled above"),
6239 CatalogType::Timestamp => unreachable!("handled above"),
6240 CatalogType::TimestampTz => unreachable!("handled above"),
6241 }
6242 }
6243 }
6244}
6245
6246struct AggregateTableFuncVisitor<'a> {
6249 scx: &'a StatementContext<'a>,
6250 aggs: Vec<Function<Aug>>,
6251 within_aggregate: bool,
6252 tables: BTreeMap<Function<Aug>, String>,
6253 table_disallowed_context: Vec<&'static str>,
6254 in_select_item: bool,
6255 id_gen: IdGen,
6256 err: Option<PlanError>,
6257}
6258
6259impl<'a> AggregateTableFuncVisitor<'a> {
6260 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6261 AggregateTableFuncVisitor {
6262 scx,
6263 aggs: Vec::new(),
6264 within_aggregate: false,
6265 tables: BTreeMap::new(),
6266 table_disallowed_context: Vec::new(),
6267 in_select_item: false,
6268 id_gen: Default::default(),
6269 err: None,
6270 }
6271 }
6272
6273 fn into_result(
6274 self,
6275 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6276 match self.err {
6277 Some(err) => Err(err),
6278 None => {
6279 let mut seen = BTreeSet::new();
6282 let aggs = self
6283 .aggs
6284 .into_iter()
6285 .filter(move |agg| seen.insert(agg.clone()))
6286 .collect();
6287 Ok((aggs, self.tables))
6288 }
6289 }
6290 }
6291}
6292
6293impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6294 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6295 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6296 Ok(i) => i,
6297 Err(_) => return,
6299 };
6300
6301 match item.func() {
6302 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6305 if self.within_aggregate {
6306 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6307 return;
6308 }
6309 self.aggs.push(func.clone());
6310 let Function {
6311 name: _,
6312 args,
6313 filter,
6314 over: _,
6315 distinct: _,
6316 } = func;
6317 if let Some(filter) = filter {
6318 self.visit_expr_mut(filter);
6319 }
6320 let old_within_aggregate = self.within_aggregate;
6321 self.within_aggregate = true;
6322 self.table_disallowed_context
6323 .push("aggregate function calls");
6324
6325 self.visit_function_args_mut(args);
6326
6327 self.within_aggregate = old_within_aggregate;
6328 self.table_disallowed_context.pop();
6329 }
6330 Ok(Func::Table { .. }) => {
6331 self.table_disallowed_context.push("other table functions");
6332 visit_mut::visit_function_mut(self, func);
6333 self.table_disallowed_context.pop();
6334 }
6335 _ => visit_mut::visit_function_mut(self, func),
6336 }
6337 }
6338
6339 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6340 }
6342
6343 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6344 let (disallowed_context, func) = match expr {
6345 Expr::Case { .. } => (Some("CASE"), None),
6346 Expr::HomogenizingFunction {
6347 function: HomogenizingFunction::Coalesce,
6348 ..
6349 } => (Some("COALESCE"), None),
6350 Expr::Function(func) if self.in_select_item => {
6351 let mut table_func = None;
6354 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6355 if let Ok(Func::Table { .. }) = item.func() {
6356 if let Some(context) = self.table_disallowed_context.last() {
6357 self.err = Some(sql_err!(
6358 "table functions are not allowed in {} (function {})",
6359 context,
6360 func.name
6361 ));
6362 return;
6363 }
6364 table_func = Some(func.clone());
6365 }
6366 }
6367 (None, table_func)
6370 }
6371 _ => (None, None),
6372 };
6373 if let Some(func) = func {
6374 visit_mut::visit_expr_mut(self, expr);
6376 if let Function {
6378 name: _,
6379 args: _,
6380 filter: None,
6381 over: None,
6382 distinct: false,
6383 } = &func
6384 {
6385 let unique_id = self.id_gen.allocate_id();
6387 let id = self
6388 .tables
6389 .entry(func)
6390 .or_insert_with(|| format!("table_func_{unique_id}"));
6391 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6394 }
6395 }
6396 if let Some(context) = disallowed_context {
6397 self.table_disallowed_context.push(context);
6398 }
6399
6400 visit_mut::visit_expr_mut(self, expr);
6401
6402 if disallowed_context.is_some() {
6403 self.table_disallowed_context.pop();
6404 }
6405 }
6406
6407 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6408 let old = self.in_select_item;
6409 self.in_select_item = true;
6410 visit_mut::visit_select_item_mut(self, si);
6411 self.in_select_item = old;
6412 }
6413}
6414
6415#[derive(Default)]
6416struct WindowFuncCollector {
6417 window_funcs: Vec<Expr<Aug>>,
6418}
6419
6420impl WindowFuncCollector {
6421 fn into_result(self) -> Vec<Expr<Aug>> {
6422 let mut seen = BTreeSet::new();
6424 let window_funcs_dedupped = self
6425 .window_funcs
6426 .into_iter()
6427 .filter(move |expr| seen.insert(expr.clone()))
6428 .rev()
6431 .collect();
6432 window_funcs_dedupped
6433 }
6434}
6435
6436impl Visit<'_, Aug> for WindowFuncCollector {
6437 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6438 match expr {
6439 Expr::Function(func) => {
6440 if func.over.is_some() {
6441 self.window_funcs.push(expr.clone());
6442 }
6443 }
6444 _ => (),
6445 }
6446 visit::visit_expr(self, expr);
6447 }
6448
6449 fn visit_query(&mut self, _query: &Query<Aug>) {
6450 }
6452}
6453
6454#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6456pub enum QueryLifetime {
6457 OneShot,
6459 Index,
6461 MaterializedView,
6463 Subscribe,
6465 View,
6467 Source,
6469}
6470
6471impl QueryLifetime {
6472 pub fn is_one_shot(&self) -> bool {
6476 let result = match self {
6477 QueryLifetime::OneShot => true,
6478 QueryLifetime::Index => false,
6479 QueryLifetime::MaterializedView => false,
6480 QueryLifetime::Subscribe => false,
6481 QueryLifetime::View => false,
6482 QueryLifetime::Source => false,
6483 };
6484 assert_eq!(!result, self.is_maintained());
6485 result
6486 }
6487
6488 pub fn is_maintained(&self) -> bool {
6491 match self {
6492 QueryLifetime::OneShot => false,
6493 QueryLifetime::Index => true,
6494 QueryLifetime::MaterializedView => true,
6495 QueryLifetime::Subscribe => true,
6496 QueryLifetime::View => true,
6497 QueryLifetime::Source => true,
6498 }
6499 }
6500
6501 pub fn allow_show(&self) -> bool {
6503 match self {
6504 QueryLifetime::OneShot => true,
6505 QueryLifetime::Index => false,
6506 QueryLifetime::MaterializedView => false,
6507 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6509 QueryLifetime::Source => false,
6510 }
6511 }
6512}
6513
6514#[derive(Debug, Clone)]
6516pub struct CteDesc {
6517 pub name: String,
6518 pub desc: RelationDesc,
6519}
6520
6521#[derive(Debug, Clone)]
6523pub struct QueryContext<'a> {
6524 pub scx: &'a StatementContext<'a>,
6526 pub lifetime: QueryLifetime,
6528 pub outer_scopes: Vec<Scope>,
6530 pub outer_relation_types: Vec<SqlRelationType>,
6532 pub ctes: BTreeMap<LocalId, CteDesc>,
6534 pub name_manager: Rc<RefCell<NameManager>>,
6536 pub recursion_guard: RecursionGuard,
6537}
6538
6539impl CheckedRecursion for QueryContext<'_> {
6540 fn recursion_guard(&self) -> &RecursionGuard {
6541 &self.recursion_guard
6542 }
6543}
6544
6545impl<'a> QueryContext<'a> {
6546 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6547 QueryContext {
6548 scx,
6549 lifetime,
6550 outer_scopes: vec![],
6551 outer_relation_types: vec![],
6552 ctes: BTreeMap::new(),
6553 name_manager: Rc::new(RefCell::new(NameManager::new())),
6554 recursion_guard: RecursionGuard::with_limit(1024), }
6556 }
6557
6558 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6559 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6560 }
6561
6562 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6565 let ctes = self.ctes.clone();
6566 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6567 let outer_relation_types = iter::once(relation_type)
6568 .chain(self.outer_relation_types.clone())
6569 .collect();
6570 let name_manager = Rc::clone(&self.name_manager);
6572
6573 QueryContext {
6574 scx: self.scx,
6575 lifetime: self.lifetime,
6576 outer_scopes,
6577 outer_relation_types,
6578 ctes,
6579 name_manager,
6580 recursion_guard: self.recursion_guard.clone(),
6581 }
6582 }
6583
6584 fn empty_derived_context(&self) -> QueryContext<'a> {
6586 let scope = Scope::empty();
6587 let ty = SqlRelationType::empty();
6588 self.derived_context(scope, ty)
6589 }
6590
6591 pub fn resolve_table_name(
6594 &self,
6595 object: ResolvedItemName,
6596 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6597 match object {
6598 ResolvedItemName::Item {
6599 id,
6600 full_name,
6601 version,
6602 ..
6603 } => {
6604 let item = self.scx.get_item(&id).at_version(version);
6605 let desc = match item.relation_desc() {
6606 Some(desc) => desc.clone(),
6607 None => {
6608 return Err(PlanError::InvalidDependency {
6609 name: full_name.to_string(),
6610 item_type: item.item_type().to_string(),
6611 });
6612 }
6613 };
6614 let expr = HirRelationExpr::Get {
6615 id: Id::Global(item.global_id()),
6616 typ: desc.typ().clone(),
6617 };
6618
6619 let name = full_name.into();
6620 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6621
6622 Ok((expr, scope))
6623 }
6624 ResolvedItemName::Cte { id, name } => {
6625 let name = name.into();
6626 let cte = self.ctes.get(&id).unwrap();
6627 let expr = HirRelationExpr::Get {
6628 id: Id::Local(id),
6629 typ: cte.desc.typ().clone(),
6630 };
6631
6632 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6633
6634 Ok((expr, scope))
6635 }
6636 ResolvedItemName::ContinualTask { id, name } => {
6637 let cte = self.ctes.get(&id).unwrap();
6638 let expr = HirRelationExpr::Get {
6639 id: Id::Local(id),
6640 typ: cte.desc.typ().clone(),
6641 };
6642
6643 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6644
6645 Ok((expr, scope))
6646 }
6647 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6648 }
6649 }
6650
6651 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6654 self.scx.humanize_scalar_type(typ, postgres_compat)
6655 }
6656}
6657
6658#[derive(Debug, Clone)]
6660pub struct ExprContext<'a> {
6661 pub qcx: &'a QueryContext<'a>,
6662 pub name: &'a str,
6664 pub scope: &'a Scope,
6667 pub relation_type: &'a SqlRelationType,
6670 pub allow_aggregates: bool,
6672 pub allow_subqueries: bool,
6674 pub allow_parameters: bool,
6676 pub allow_windows: bool,
6678}
6679
6680impl CheckedRecursion for ExprContext<'_> {
6681 fn recursion_guard(&self) -> &RecursionGuard {
6682 &self.qcx.recursion_guard
6683 }
6684}
6685
6686impl<'a> ExprContext<'a> {
6687 pub fn catalog(&self) -> &dyn SessionCatalog {
6688 self.qcx.scx.catalog
6689 }
6690
6691 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6692 let mut ecx = self.clone();
6693 ecx.name = name;
6694 ecx
6695 }
6696
6697 pub fn column_type<E>(&self, expr: &E) -> E::Type
6698 where
6699 E: AbstractExpr,
6700 {
6701 expr.typ(
6702 &self.qcx.outer_relation_types,
6703 self.relation_type,
6704 &self.qcx.scx.param_types.borrow(),
6705 )
6706 }
6707
6708 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6709 where
6710 E: AbstractExpr,
6711 {
6712 self.column_type(expr).scalar_type()
6713 }
6714
6715 fn derived_query_context(&self) -> QueryContext<'_> {
6716 let mut scope = self.scope.clone();
6717 scope.lateral_barrier = true;
6718 self.qcx.derived_context(scope, self.relation_type.clone())
6719 }
6720
6721 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6722 self.qcx.scx.require_feature_flag(flag)
6723 }
6724
6725 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6726 &self.qcx.scx.param_types
6727 }
6728
6729 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6732 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6733 }
6734
6735 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6736 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6737 }
6738}
6739
6740#[derive(Debug, Clone)]
6746pub struct NameManager(BTreeSet<Arc<str>>);
6747
6748impl NameManager {
6749 pub fn new() -> Self {
6751 Self(BTreeSet::new())
6752 }
6753
6754 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6757 let s = s.as_ref();
6758 if let Some(interned) = self.0.get(s) {
6759 Arc::clone(interned)
6760 } else {
6761 let interned: Arc<str> = Arc::from(s);
6762 self.0.insert(Arc::clone(&interned));
6763 interned
6764 }
6765 }
6766
6767 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6770 self.intern(item.column_name.as_str())
6786 }
6787}
6788
6789#[cfg(test)]
6790mod test {
6791 use super::*;
6792
6793 #[mz_ore::test]
6798 pub fn test_name_manager_string_interning() {
6799 let mut nm = NameManager::new();
6800
6801 let orig_hi = "hi";
6802 let hi = nm.intern(orig_hi);
6803 let hello = nm.intern("hello");
6804
6805 assert_ne!(hi.as_ptr(), hello.as_ptr());
6806
6807 let hi2 = nm.intern("hi");
6809 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6810
6811 let s = format!(
6813 "{}{}",
6814 hi.chars().nth(0).unwrap(),
6815 hi2.chars().nth(1).unwrap()
6816 );
6817 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6819
6820 let hi3 = nm.intern(s);
6821 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6822 }
6823}