1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50 ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51 MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55 Eval, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME,
56 RowSetFinishing, TableFunc, func as expr_func,
57};
58use mz_ore::collections::CollectionExt;
59use mz_ore::error::ErrorExt;
60use mz_ore::id_gen::IdGen;
61use mz_ore::option::FallibleMapExt;
62use mz_ore::stack::{CheckedRecursion, RecursionGuard};
63use mz_ore::str::StrExt;
64use mz_repr::adt::char::CharLength;
65use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
66use mz_repr::adt::timestamp::TimestampPrecision;
67use mz_repr::adt::varchar::VarCharMaxLength;
68use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
69use mz_repr::{
70 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
71 ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
72 UNKNOWN_COLUMN_NAME, strconv,
73};
74use mz_sql_parser::ast::display::AstDisplay;
75use mz_sql_parser::ast::visit::Visit;
76use mz_sql_parser::ast::visit_mut::{self, VisitMut};
77use mz_sql_parser::ast::{
78 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
79 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
80 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
81 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
82 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
83 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
84 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
85 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
86};
87use mz_sql_parser::ident;
88
89use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
90use crate::func::{self, Func, FuncSpec, TableFuncImpl};
91use crate::names::{
92 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
93};
94use crate::plan::PlanError::InvalidWmrRecursionLimit;
95use crate::plan::error::PlanError;
96use crate::plan::hir::{
97 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
98 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
99 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
100 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
101};
102use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
103use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
104use crate::plan::statement::{StatementContext, StatementDesc, show};
105use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
106use crate::plan::{
107 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
108 literal, transform_ast,
109};
110use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
111use crate::session::vars::{self, FeatureFlag};
112use crate::{ORDINALITY_COL_NAME, normalize};
113
114#[derive(Debug)]
115pub struct PlannedRootQuery<E> {
116 pub expr: E,
117 pub desc: RelationDesc,
118 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
119 pub scope: Scope,
120}
121
122#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
131pub fn plan_root_query(
132 scx: &StatementContext,
133 mut query: Query<Aug>,
134 lifetime: QueryLifetime,
135) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
136 transform_ast::transform(scx, &mut query)?;
137 let mut qcx = QueryContext::root(scx, lifetime);
138 let PlannedQuery {
139 mut expr,
140 scope,
141 order_by,
142 limit,
143 offset,
144 project,
145 group_size_hints,
146 } = plan_query(&mut qcx, &query)?;
147
148 let mut finishing = RowSetFinishing {
149 limit,
150 offset,
151 project,
152 order_by,
153 };
154
155 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
161
162 if lifetime.is_maintained() {
163 expr.finish_maintained(&mut finishing, group_size_hints);
164 }
165
166 let typ = qcx.relation_type(&expr);
167 let typ = SqlRelationType::new(
168 finishing
169 .project
170 .iter()
171 .map(|i| typ.column_types[*i].clone())
172 .collect(),
173 );
174 let desc = RelationDesc::new(typ, scope.column_names());
175
176 Ok(PlannedRootQuery {
177 expr,
178 desc,
179 finishing,
180 scope,
181 })
182}
183
184fn try_push_projection_order_by(
195 expr: &mut HirRelationExpr,
196 project: &mut Vec<usize>,
197 order_by: &mut Vec<ColumnOrder>,
198) -> bool {
199 let mut unproject = vec![None; expr.arity()];
200 for (out_i, in_i) in project.iter().copied().enumerate() {
201 unproject[in_i] = Some(out_i);
202 }
203 if order_by
204 .iter()
205 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
206 {
207 let trivial_project = (0..project.len()).collect();
208 *expr = expr.take().project(mem::replace(project, trivial_project));
209 for ob in order_by {
210 ob.column = unproject[ob.column].unwrap();
211 }
212 true
213 } else {
214 false
215 }
216}
217
218pub fn plan_insert_query(
219 scx: &StatementContext,
220 table_name: ResolvedItemName,
221 columns: Vec<Ident>,
222 source: InsertSource<Aug>,
223 returning: Vec<SelectItem<Aug>>,
224) -> Result<
225 (
226 CatalogItemId,
227 HirRelationExpr,
228 PlannedRootQuery<Vec<HirScalarExpr>>,
229 ),
230 PlanError,
231> {
232 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
233 let table = scx.get_item_by_resolved_name(&table_name)?;
234
235 if table.item_type() != CatalogItemType::Table {
237 sql_bail!(
238 "cannot insert into {} '{}'",
239 table.item_type(),
240 table_name.full_name_str()
241 );
242 }
243 let desc = table
244 .relation_desc()
245 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
246 let mut defaults = table
247 .writable_table_details()
248 .ok_or_else(|| {
249 sql_err!(
250 "cannot insert into non-writeable table '{}'",
251 table_name.full_name_str()
252 )
253 })?
254 .to_vec();
255
256 for default in &mut defaults {
257 transform_ast::transform(scx, default)?;
258 }
259
260 if table.id().is_system() {
261 sql_bail!(
262 "cannot insert into system table '{}'",
263 table_name.full_name_str()
264 );
265 }
266
267 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
268
269 let mut source_types = Vec::with_capacity(columns.len());
271 let mut ordering = Vec::with_capacity(columns.len());
272
273 if columns.is_empty() {
274 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
277 ordering.extend(0..desc.arity());
278 } else {
279 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
280 .iter()
281 .enumerate()
282 .map(|(idx, (name, typ))| (name, (idx, typ)))
283 .collect();
284
285 for c in &columns {
286 if let Some((idx, typ)) = column_by_name.get(c) {
287 ordering.push(*idx);
288 source_types.push(&typ.scalar_type);
289 } else {
290 sql_bail!(
291 "column {} of relation {} does not exist",
292 c.quoted(),
293 table_name.full_name_str().quoted()
294 );
295 }
296 }
297 if let Some(dup) = columns.iter().duplicates().next() {
298 sql_bail!("column {} specified more than once", dup.quoted());
299 }
300 };
301
302 let expr = match source {
304 InsertSource::Query(mut query) => {
305 transform_ast::transform(scx, &mut query)?;
306
307 match query {
308 Query {
310 body: SetExpr::Values(Values(values)),
311 ctes,
312 order_by,
313 limit: None,
314 offset: None,
315 } if ctes.is_empty() && order_by.is_empty() => {
316 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
317 plan_values_insert(&qcx, &names, &source_types, &values)?
318 }
319 _ => {
320 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
321 expr
322 }
323 }
324 }
325 InsertSource::DefaultValues => {
326 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
327 }
328 };
329
330 let expr_arity = expr.arity();
331
332 let max_columns = if columns.is_empty() {
335 desc.arity()
336 } else {
337 columns.len()
338 };
339 if expr_arity > max_columns {
340 sql_bail!("INSERT has more expressions than target columns");
341 }
342 if expr_arity < columns.len() {
344 sql_bail!("INSERT has more target columns than expressions");
345 }
346
347 source_types.truncate(expr_arity);
349 ordering.truncate(expr_arity);
350
351 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
354 sql_err!(
355 "column {} is of type {} but expression is of type {}",
356 desc.get_name(ordering[e.column]).quoted(),
357 qcx.humanize_sql_scalar_type(&e.target_type, false),
358 qcx.humanize_sql_scalar_type(&e.source_type, false),
359 )
360 })?;
361
362 let mut map_exprs = vec![];
364 let mut project_key = Vec::with_capacity(desc.arity());
365
366 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
368
369 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
370 for (col_idx, (col_typ, default)) in column_details {
371 if let Some(src_idx) = col_to_source.get(&col_idx) {
372 project_key.push(*src_idx);
373 } else {
374 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
375 project_key.push(expr_arity + map_exprs.len());
376 map_exprs.push(hir);
377 }
378 }
379
380 let returning = {
381 let (scope, typ) = if let ResolvedItemName::Item {
382 full_name,
383 version: _,
384 ..
385 } = table_name
386 {
387 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
388 let typ = desc.typ().clone();
389 (scope, typ)
390 } else {
391 (Scope::empty(), SqlRelationType::empty())
392 };
393 let ecx = &ExprContext {
394 qcx: &qcx,
395 name: "RETURNING clause",
396 scope: &scope,
397 relation_type: &typ,
398 allow_aggregates: false,
399 allow_subqueries: false,
400 allow_parameters: true,
401 allow_windows: false,
402 };
403 let table_func_names = BTreeMap::new();
404 let mut output_columns = vec![];
405 let mut new_exprs = vec![];
406 let mut new_type = SqlRelationType::empty();
407 for mut si in returning {
408 transform_ast::transform(scx, &mut si)?;
409 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
410 let expr = match &select_item {
411 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
412 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
413 };
414 output_columns.push(column_name);
415 let typ = ecx.column_type(&expr);
416 new_type.column_types.push(typ);
417 new_exprs.push(expr);
418 }
419 }
420 let desc = RelationDesc::new(new_type, output_columns);
421 let desc_arity = desc.arity();
422 PlannedRootQuery {
423 expr: new_exprs,
424 desc,
425 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
426 scope,
427 }
428 };
429
430 Ok((
431 table.id(),
432 expr.map(map_exprs).project(project_key),
433 returning,
434 ))
435}
436
437pub fn plan_copy_item(
448 scx: &StatementContext,
449 item_name: ResolvedItemName,
450 columns: Vec<Ident>,
451) -> Result<
452 (
453 CatalogItemId,
454 RelationDesc,
455 Vec<ColumnIndex>,
456 Option<MapFilterProject>,
457 ),
458 PlanError,
459> {
460 let item = scx.get_item_by_resolved_name(&item_name)?;
461 let fullname = scx.catalog.resolve_full_name(item.name());
462 let table_desc = match item.relation_desc() {
463 Some(desc) => desc.into_owned(),
464 None => {
465 return Err(PlanError::InvalidDependency {
466 name: fullname.to_string(),
467 item_type: item.item_type().to_string(),
468 });
469 }
470 };
471 let mut ordering = Vec::with_capacity(columns.len());
472
473 let mfp = if let Some(table_defaults) = item.writable_table_details() {
483 let mut table_defaults = table_defaults.to_vec();
484
485 for default in &mut table_defaults {
486 transform_ast::transform(scx, default)?;
487 }
488
489 let source_column_names: Vec<_> = columns
491 .iter()
492 .cloned()
493 .map(normalize::column_name)
494 .collect();
495
496 let mut default_exprs = Vec::new();
497 let mut project_keys = Vec::with_capacity(table_desc.arity());
498
499 let column_details = table_desc.iter().zip_eq(table_defaults);
502 for ((col_name, col_type), col_default) in column_details {
503 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
504 if let Some(src_idx) = maybe_src_idx {
505 project_keys.push(src_idx);
506 } else {
507 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
510 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
511 project_keys.push(source_column_names.len() + default_exprs.len());
512 default_exprs.push(mir);
513 }
514 }
515
516 let mfp = MapFilterProject::new(source_column_names.len())
517 .map(default_exprs)
518 .project(project_keys);
519 Some(mfp)
520 } else {
521 None
522 };
523
524 let source_desc = if columns.is_empty() {
526 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
527 ordering.extend(indexes);
528
529 table_desc
531 } else {
532 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
533 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
534 .iter_all()
535 .map(|(idx, name, typ)| (name, (*idx, typ)))
536 .collect();
537
538 let mut names = Vec::with_capacity(columns.len());
539 let mut source_types = Vec::with_capacity(columns.len());
540
541 for c in &columns {
542 if let Some((idx, typ)) = column_by_name.get(c) {
543 ordering.push(*idx);
544 source_types.push((*typ).clone());
545 names.push(c.clone());
546 } else {
547 sql_bail!(
548 "column {} of relation {} does not exist",
549 c.quoted(),
550 item_name.full_name_str().quoted()
551 );
552 }
553 }
554 if let Some(dup) = columns.iter().duplicates().next() {
555 sql_bail!("column {} specified more than once", dup.quoted());
556 }
557
558 RelationDesc::new(SqlRelationType::new(source_types), names)
560 };
561
562 Ok((item.id(), source_desc, ordering, mfp))
563}
564
565pub fn plan_copy_from(
569 scx: &StatementContext,
570 table_name: ResolvedItemName,
571 columns: Vec<Ident>,
572) -> Result<
573 (
574 CatalogItemId,
575 RelationDesc,
576 Vec<ColumnIndex>,
577 Option<MapFilterProject>,
578 ),
579 PlanError,
580> {
581 let table = scx.get_item_by_resolved_name(&table_name)?;
582
583 if table.item_type() != CatalogItemType::Table {
585 sql_bail!(
586 "cannot insert into {} '{}'",
587 table.item_type(),
588 table_name.full_name_str()
589 );
590 }
591
592 let _ = table.writable_table_details().ok_or_else(|| {
593 sql_err!(
594 "cannot insert into non-writeable table '{}'",
595 table_name.full_name_str()
596 )
597 })?;
598
599 if table.id().is_system() {
600 sql_bail!(
601 "cannot insert into system table '{}'",
602 table_name.full_name_str()
603 );
604 }
605 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
606
607 Ok((id, desc, ordering, mfp))
608}
609
610pub fn plan_copy_from_rows(
613 pcx: &PlanContext,
614 catalog: &dyn SessionCatalog,
615 target_id: CatalogItemId,
616 target_name: String,
617 columns: Vec<ColumnIndex>,
618 rows: Vec<mz_repr::Row>,
619) -> Result<HirRelationExpr, PlanError> {
620 let scx = StatementContext::new(Some(pcx), catalog);
621
622 let table = catalog
624 .try_get_item(&target_id)
625 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
626 .at_version(RelationVersionSelector::Latest);
627
628 let mut defaults = table
629 .writable_table_details()
630 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
631 .to_vec();
632
633 for default in &mut defaults {
634 transform_ast::transform(&scx, default)?;
635 }
636
637 let desc = table
638 .relation_desc()
639 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
640 let column_types = columns
641 .iter()
642 .map(|x| desc.get_type(x).clone())
643 .map(|mut x| {
644 x.nullable = true;
647 x
648 })
649 .collect();
650 let typ = SqlRelationType::new(column_types);
651 let expr = HirRelationExpr::Constant {
652 rows,
653 typ: typ.clone(),
654 };
655
656 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
662 if columns == default {
663 return Ok(expr);
664 }
665
666 let mut map_exprs = vec![];
668 let mut project_key = Vec::with_capacity(desc.arity());
669
670 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
672
673 let column_details = desc.iter_all().zip_eq(defaults);
674 for ((col_idx, _col_name, col_typ), default) in column_details {
675 if let Some(src_idx) = col_to_source.get(&col_idx) {
676 project_key.push(*src_idx);
677 } else {
678 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
679 project_key.push(typ.arity() + map_exprs.len());
680 map_exprs.push(hir);
681 }
682 }
683
684 Ok(expr.map(map_exprs).project(project_key))
685}
686
687pub struct ReadThenWritePlan {
689 pub id: CatalogItemId,
690 pub selection: HirRelationExpr,
695 pub assignments: BTreeMap<usize, HirScalarExpr>,
697 pub finishing: RowSetFinishing,
698}
699
700pub fn plan_delete_query(
701 scx: &StatementContext,
702 mut delete_stmt: DeleteStatement<Aug>,
703) -> Result<ReadThenWritePlan, PlanError> {
704 transform_ast::transform(scx, &mut delete_stmt)?;
705
706 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
707 plan_mutation_query_inner(
708 qcx,
709 delete_stmt.table_name,
710 delete_stmt.alias,
711 delete_stmt.using,
712 vec![],
713 delete_stmt.selection,
714 )
715}
716
717pub fn plan_update_query(
718 scx: &StatementContext,
719 mut update_stmt: UpdateStatement<Aug>,
720) -> Result<ReadThenWritePlan, PlanError> {
721 transform_ast::transform(scx, &mut update_stmt)?;
722
723 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
724
725 plan_mutation_query_inner(
726 qcx,
727 update_stmt.table_name,
728 update_stmt.alias,
729 vec![],
730 update_stmt.assignments,
731 update_stmt.selection,
732 )
733}
734
735pub fn plan_mutation_query_inner(
736 qcx: QueryContext,
737 table_name: ResolvedItemName,
738 alias: Option<TableAlias>,
739 using: Vec<TableWithJoins<Aug>>,
740 assignments: Vec<Assignment<Aug>>,
741 selection: Option<Expr<Aug>>,
742) -> Result<ReadThenWritePlan, PlanError> {
743 let (id, version) = match table_name {
745 ResolvedItemName::Item { id, version, .. } => (id, version),
746 _ => sql_bail!("cannot mutate non-user table"),
747 };
748
749 let item = qcx.scx.get_item(&id).at_version(version);
751 if item.item_type() != CatalogItemType::Table {
752 sql_bail!(
753 "cannot mutate {} '{}'",
754 item.item_type(),
755 table_name.full_name_str()
756 );
757 }
758 let _ = item.writable_table_details().ok_or_else(|| {
759 sql_err!(
760 "cannot mutate non-writeable table '{}'",
761 table_name.full_name_str()
762 )
763 })?;
764 if id.is_system() {
765 sql_bail!(
766 "cannot mutate system table '{}'",
767 table_name.full_name_str()
768 );
769 }
770
771 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
773 let scope = plan_table_alias(scope, alias.as_ref())?;
774 let desc = item.relation_desc().expect("table has desc");
775 let relation_type = qcx.relation_type(&get);
776
777 if using.is_empty() {
778 if let Some(expr) = selection {
779 let ecx = &ExprContext {
780 qcx: &qcx,
781 name: "WHERE clause",
782 scope: &scope,
783 relation_type: &relation_type,
784 allow_aggregates: false,
785 allow_subqueries: true,
786 allow_parameters: true,
787 allow_windows: false,
788 };
789 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
790 get = get.filter(vec![expr]);
791 }
792 } else {
793 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
794 }
795
796 let mut sets = BTreeMap::new();
797 for Assignment { id, value } in assignments {
798 let name = normalize::column_name(id);
800 match desc.get_by_name(&name) {
801 Some((idx, typ)) => {
802 let ecx = &ExprContext {
803 qcx: &qcx,
804 name: "SET clause",
805 scope: &scope,
806 relation_type: &relation_type,
807 allow_aggregates: false,
808 allow_subqueries: false,
809 allow_parameters: true,
810 allow_windows: false,
811 };
812 let expr = plan_expr(ecx, &value)?.cast_to(
813 ecx,
814 CastContext::Assignment,
815 &typ.scalar_type,
816 )?;
817
818 if sets.insert(idx, expr).is_some() {
819 sql_bail!("column {} set twice", name)
820 }
821 }
822 None => sql_bail!("unknown column {}", name),
823 };
824 }
825
826 let finishing = RowSetFinishing {
827 order_by: vec![],
828 limit: None,
829 offset: 0,
830 project: (0..desc.arity()).collect(),
831 };
832
833 Ok(ReadThenWritePlan {
834 id,
835 selection: get,
836 finishing,
837 assignments: sets,
838 })
839}
840
841fn handle_mutation_using_clause(
853 qcx: &QueryContext,
854 selection: Option<Expr<Aug>>,
855 using: Vec<TableWithJoins<Aug>>,
856 get: HirRelationExpr,
857 outer_scope: Scope,
858) -> Result<HirRelationExpr, PlanError> {
859 let (mut using_rel_expr, using_scope) =
863 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
864 let (left, left_scope) = l;
865 plan_join(
866 qcx,
867 left,
868 left_scope,
869 &Join {
870 relation: TableFactor::NestedJoin {
871 join: Box::new(twj),
872 alias: None,
873 },
874 join_operator: JoinOperator::CrossJoin,
875 },
876 )
877 })?;
878
879 if let Some(expr) = selection {
880 let on = HirScalarExpr::literal_true();
886 let joined = using_rel_expr
887 .clone()
888 .join(get.clone(), on, JoinKind::Inner);
889 let joined_scope = using_scope.product(outer_scope)?;
890 let joined_relation_type = qcx.relation_type(&joined);
891
892 let ecx = &ExprContext {
893 qcx,
894 name: "WHERE clause",
895 scope: &joined_scope,
896 relation_type: &joined_relation_type,
897 allow_aggregates: false,
898 allow_subqueries: true,
899 allow_parameters: true,
900 allow_windows: false,
901 };
902
903 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
905
906 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
910 use mz_expr::visit::Visit;
912 expr.visit_mut_post(&mut |e| {
913 if let HirScalarExpr::Column(c, _name) = e {
914 if c.column >= using_rel_arity {
915 c.level += 1;
916 c.column -= using_rel_arity;
917 };
918 }
919 });
920
921 using_rel_expr = using_rel_expr.filter(vec![expr]);
925 } else {
926 let _joined_scope = using_scope.product(outer_scope)?;
929 }
930 Ok(get.filter(vec![using_rel_expr.exists()]))
941}
942
943#[derive(Debug)]
944pub(crate) struct CastRelationError {
945 pub(crate) column: usize,
946 pub(crate) source_type: SqlScalarType,
947 pub(crate) target_type: SqlScalarType,
948}
949
950pub(crate) fn cast_relation<'a, I>(
954 qcx: &QueryContext,
955 ccx: CastContext,
956 expr: HirRelationExpr,
957 target_types: I,
958) -> Result<HirRelationExpr, CastRelationError>
959where
960 I: IntoIterator<Item = &'a SqlScalarType>,
961{
962 let ecx = &ExprContext {
963 qcx,
964 name: "values",
965 scope: &Scope::empty(),
966 relation_type: &qcx.relation_type(&expr),
967 allow_aggregates: false,
968 allow_subqueries: true,
969 allow_parameters: true,
970 allow_windows: false,
971 };
972 let mut map_exprs = vec![];
973 let mut project_key = vec![];
974 for (i, target_typ) in target_types.into_iter().enumerate() {
975 let expr = HirScalarExpr::column(i);
976 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
980 Ok(cast_expr) => {
981 if expr == cast_expr {
982 project_key.push(i);
984 } else {
985 project_key.push(ecx.relation_type.arity() + map_exprs.len());
987 map_exprs.push(cast_expr);
988 }
989 }
990 Err(_) => {
991 return Err(CastRelationError {
992 column: i,
993 source_type: ecx.scalar_type(&expr),
994 target_type: target_typ.clone(),
995 });
996 }
997 }
998 }
999 Ok(expr.map(map_exprs).project(project_key))
1000}
1001
1002pub fn plan_as_of(
1005 scx: &StatementContext,
1006 as_of: Option<AsOf<Aug>>,
1007) -> Result<QueryWhen, PlanError> {
1008 match as_of {
1009 None => Ok(QueryWhen::Immediately),
1010 Some(as_of) => match as_of {
1011 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1012 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013 },
1014 }
1015}
1016
1017pub fn plan_as_of_or_up_to(
1027 scx: &StatementContext,
1028 mut expr: Expr<Aug>,
1029) -> Result<mz_repr::Timestamp, PlanError> {
1030 let scope = Scope::empty();
1031 let desc = RelationDesc::empty();
1032 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1035 transform_ast::transform(scx, &mut expr)?;
1036 let ecx = &ExprContext {
1037 qcx: &qcx,
1038 name: "AS OF or UP TO",
1039 scope: &scope,
1040 relation_type: desc.typ(),
1041 allow_aggregates: false,
1042 allow_subqueries: false,
1043 allow_parameters: false,
1044 allow_windows: false,
1045 };
1046 let hir = plan_expr(ecx, &expr)?.cast_to(
1047 ecx,
1048 CastContext::Assignment,
1049 &SqlScalarType::MzTimestamp,
1050 )?;
1051 if hir.contains_unmaterializable() {
1052 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1053 }
1054 let timestamp = hir
1061 .into_literal_mz_timestamp()
1062 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1063 Ok(timestamp)
1064}
1065
1066pub fn plan_secret_as(
1068 scx: &StatementContext,
1069 mut expr: Expr<Aug>,
1070) -> Result<MirScalarExpr, PlanError> {
1071 let scope = Scope::empty();
1072 let desc = RelationDesc::empty();
1073 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1074
1075 transform_ast::transform(scx, &mut expr)?;
1076
1077 let ecx = &ExprContext {
1078 qcx: &qcx,
1079 name: "AS",
1080 scope: &scope,
1081 relation_type: desc.typ(),
1082 allow_aggregates: false,
1083 allow_subqueries: false,
1084 allow_parameters: false,
1085 allow_windows: false,
1086 };
1087 let expr = plan_expr(ecx, &expr)?
1088 .type_as(ecx, &SqlScalarType::Bytes)?
1089 .lower_uncorrelated(scx.catalog.system_vars())?;
1090 Ok(expr)
1091}
1092
1093pub fn plan_webhook_validate_using(
1095 scx: &StatementContext,
1096 validate_using: CreateWebhookSourceCheck<Aug>,
1097) -> Result<WebhookValidation, PlanError> {
1098 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1099
1100 let CreateWebhookSourceCheck {
1101 options,
1102 using: mut expr,
1103 } = validate_using;
1104
1105 let mut column_typs = vec![];
1106 let mut column_names = vec![];
1107
1108 let (bodies, headers, secrets) = options
1109 .map(|o| (o.bodies, o.headers, o.secrets))
1110 .unwrap_or_default();
1111
1112 let mut body_tuples = vec![];
1114 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1115 let scalar_type = use_bytes
1116 .then_some(SqlScalarType::Bytes)
1117 .unwrap_or(SqlScalarType::String);
1118 let name = alias
1119 .map(|a| a.into_string())
1120 .unwrap_or_else(|| "body".to_string());
1121
1122 column_typs.push(SqlColumnType {
1123 scalar_type,
1124 nullable: false,
1125 });
1126 column_names.push(name);
1127
1128 let column_idx = column_typs.len() - 1;
1130 assert_eq!(
1132 column_idx,
1133 column_names.len() - 1,
1134 "body column names and types don't match"
1135 );
1136 body_tuples.push((column_idx, use_bytes));
1137 }
1138
1139 let mut header_tuples = vec![];
1141
1142 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1143 let value_type = use_bytes
1144 .then_some(SqlScalarType::Bytes)
1145 .unwrap_or(SqlScalarType::String);
1146 let name = alias
1147 .map(|a| a.into_string())
1148 .unwrap_or_else(|| "headers".to_string());
1149
1150 column_typs.push(SqlColumnType {
1151 scalar_type: SqlScalarType::Map {
1152 value_type: Box::new(value_type),
1153 custom_id: None,
1154 },
1155 nullable: false,
1156 });
1157 column_names.push(name);
1158
1159 let column_idx = column_typs.len() - 1;
1161 assert_eq!(
1163 column_idx,
1164 column_names.len() - 1,
1165 "header column names and types don't match"
1166 );
1167 header_tuples.push((column_idx, use_bytes));
1168 }
1169
1170 let mut validation_secrets = vec![];
1172
1173 for CreateWebhookSourceSecret {
1174 secret,
1175 alias,
1176 use_bytes,
1177 } in secrets
1178 {
1179 let scalar_type = use_bytes
1181 .then_some(SqlScalarType::Bytes)
1182 .unwrap_or(SqlScalarType::String);
1183
1184 column_typs.push(SqlColumnType {
1185 scalar_type,
1186 nullable: false,
1187 });
1188 let ResolvedItemName::Item {
1189 id,
1190 full_name: FullItemName { item, .. },
1191 ..
1192 } = secret
1193 else {
1194 return Err(PlanError::InvalidSecret(Box::new(secret)));
1195 };
1196
1197 let name = if let Some(alias) = alias {
1199 alias.into_string()
1200 } else {
1201 item
1202 };
1203 column_names.push(name);
1204
1205 let column_idx = column_typs.len() - 1;
1208 assert_eq!(
1210 column_idx,
1211 column_names.len() - 1,
1212 "column names and types don't match"
1213 );
1214
1215 validation_secrets.push(WebhookValidationSecret {
1216 id,
1217 column_idx,
1218 use_bytes,
1219 });
1220 }
1221
1222 let relation_typ = SqlRelationType::new(column_typs);
1223 let desc = RelationDesc::new(relation_typ, column_names.clone());
1224 let scope = Scope::from_source(None, column_names);
1225
1226 transform_ast::transform(scx, &mut expr)?;
1227
1228 let ecx = &ExprContext {
1229 qcx: &qcx,
1230 name: "CHECK",
1231 scope: &scope,
1232 relation_type: desc.typ(),
1233 allow_aggregates: false,
1234 allow_subqueries: false,
1235 allow_parameters: false,
1236 allow_windows: false,
1237 };
1238 let expr = plan_expr(ecx, &expr)?
1239 .type_as(ecx, &SqlScalarType::Bool)?
1240 .lower_uncorrelated(scx.catalog.system_vars())?;
1241 let validation = WebhookValidation {
1242 expression: expr,
1243 relation_desc: desc,
1244 bodies: body_tuples,
1245 headers: header_tuples,
1246 secrets: validation_secrets,
1247 };
1248 Ok(validation)
1249}
1250
1251pub fn plan_default_expr(
1252 scx: &StatementContext,
1253 expr: &Expr<Aug>,
1254 target_ty: &SqlScalarType,
1255) -> Result<HirScalarExpr, PlanError> {
1256 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1257 let ecx = &ExprContext {
1258 qcx: &qcx,
1259 name: "DEFAULT expression",
1260 scope: &Scope::empty(),
1261 relation_type: &SqlRelationType::empty(),
1262 allow_aggregates: false,
1263 allow_subqueries: false,
1264 allow_parameters: false,
1265 allow_windows: false,
1266 };
1267 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1268 Ok(hir)
1269}
1270
1271pub fn plan_params<'a>(
1272 scx: &'a StatementContext,
1273 params: Vec<Expr<Aug>>,
1274 desc: &StatementDesc,
1275) -> Result<Params, PlanError> {
1276 if params.len() != desc.param_types.len() {
1277 sql_bail!(
1278 "expected {} params, got {}",
1279 desc.param_types.len(),
1280 params.len()
1281 );
1282 }
1283
1284 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1285
1286 let mut datums = Row::default();
1287 let mut packer = datums.packer();
1288 let mut actual_types = Vec::new();
1289 let temp_storage = &RowArena::new();
1290 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1291 transform_ast::transform(scx, &mut expr)?;
1292
1293 let ecx = execute_expr_context(&qcx);
1294 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1295 let actual_ty = ecx.scalar_type(&ex);
1296 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1297 return Err(PlanError::WrongParameterType(
1298 i + 1,
1299 ecx.humanize_sql_scalar_type(expected_ty, false),
1300 ecx.humanize_sql_scalar_type(&actual_ty, false),
1301 ));
1302 }
1303 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1304 let evaled = ex.eval(&[], temp_storage)?;
1305 packer.push(evaled);
1306 actual_types.push(actual_ty);
1307 }
1308 Ok(Params {
1309 datums,
1310 execute_types: actual_types,
1311 expected_types: desc.param_types.clone(),
1312 })
1313}
1314
1315static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1316static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1317
1318pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1320 ExprContext {
1321 qcx,
1322 name: "EXECUTE",
1323 scope: &EXECUTE_CONTEXT_SCOPE,
1324 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1325 allow_aggregates: false,
1326 allow_subqueries: false,
1327 allow_parameters: false,
1328 allow_windows: false,
1329 }
1330}
1331
1332pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1337 LazyLock::new(|| CastContext::Assignment);
1338
1339pub fn plan_index_exprs<'a>(
1340 scx: &'a StatementContext,
1341 on_desc: &RelationDesc,
1342 exprs: Vec<Expr<Aug>>,
1343) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1344 let scope = Scope::from_source(None, on_desc.iter_names());
1345 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1346
1347 let ecx = &ExprContext {
1348 qcx: &qcx,
1349 name: "CREATE INDEX",
1350 scope: &scope,
1351 relation_type: on_desc.typ(),
1352 allow_aggregates: false,
1353 allow_subqueries: false,
1354 allow_parameters: false,
1355 allow_windows: false,
1356 };
1357 let repr_col_types: Vec<ReprColumnType> = on_desc
1358 .typ()
1359 .column_types
1360 .iter()
1361 .map(ReprColumnType::from)
1362 .collect();
1363 let mut out = vec![];
1364 for mut expr in exprs {
1365 transform_ast::transform(scx, &mut expr)?;
1366 let expr = plan_expr_or_col_index(ecx, &expr)?;
1367 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1368 expr.reduce(&repr_col_types);
1369 out.push(expr);
1370 }
1371 Ok(out)
1372}
1373
1374fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1375 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1376 Some(column) => Ok(HirScalarExpr::column(column)),
1377 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1378 }
1379}
1380
1381fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1382 match e {
1383 Expr::Value(Value::Number(n)) => {
1384 let n = n.parse::<usize>().map_err(|e| {
1385 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1386 })?;
1387 if n < 1 || n > max {
1388 sql_bail!(
1389 "column reference {} in {} is out of range (1 - {})",
1390 n,
1391 name,
1392 max
1393 );
1394 }
1395 Ok(Some(n - 1))
1396 }
1397 _ => Ok(None),
1398 }
1399}
1400
1401struct PlannedQuery {
1402 expr: HirRelationExpr,
1403 scope: Scope,
1404 order_by: Vec<ColumnOrder>,
1405 limit: Option<HirScalarExpr>,
1406 offset: HirScalarExpr,
1412 project: Vec<usize>,
1413 group_size_hints: GroupSizeHints,
1414}
1415
1416fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1417 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1418}
1419
1420fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1421 let cte_bindings = plan_ctes(qcx, q)?;
1424
1425 let limit = match &q.limit {
1426 None => None,
1427 Some(Limit {
1428 quantity,
1429 with_ties: false,
1430 }) => {
1431 let ecx = &ExprContext {
1432 qcx,
1433 name: "LIMIT",
1434 scope: &Scope::empty(),
1435 relation_type: &SqlRelationType::empty(),
1436 allow_aggregates: false,
1437 allow_subqueries: true,
1438 allow_parameters: true,
1439 allow_windows: false,
1440 };
1441 let limit = plan_expr(ecx, quantity)?;
1442 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1443
1444 let limit = if limit.is_constant() {
1445 let arena = RowArena::new();
1446 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1447
1448 match limit.eval(&[], &arena)? {
1452 d @ Datum::Int64(v) if v >= 0 => {
1453 HirScalarExpr::literal(d, SqlScalarType::Int64)
1454 }
1455 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1456 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1457 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1458 }
1459 } else {
1460 qcx.scx
1462 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1463 limit
1464 };
1465
1466 Some(limit)
1467 }
1468 Some(Limit {
1469 quantity: _,
1470 with_ties: true,
1471 }) => bail_unsupported!("FETCH ... WITH TIES"),
1472 };
1473
1474 let offset = match &q.offset {
1475 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1476 Some(offset) => {
1477 let ecx = &ExprContext {
1478 qcx,
1479 name: "OFFSET",
1480 scope: &Scope::empty(),
1481 relation_type: &SqlRelationType::empty(),
1482 allow_aggregates: false,
1483 allow_subqueries: false,
1484 allow_parameters: true,
1485 allow_windows: false,
1486 };
1487 let offset = plan_expr(ecx, offset)?;
1488 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1489
1490 let offset = if offset.is_constant() {
1491 let offset_value = offset_into_value(offset)?;
1493 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1494 } else {
1495 if !offset.contains_parameters() {
1499 return Err(PlanError::InvalidOffset(format!(
1500 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1501 offset
1502 )));
1503 }
1504 offset
1505 };
1506 offset
1507 }
1508 };
1509
1510 let mut planned_query = match &q.body {
1511 SetExpr::Select(s) => {
1512 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1514 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1515
1516 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1517 PlannedQuery {
1518 expr: plan.expr,
1519 scope: plan.scope,
1520 order_by: plan.order_by,
1521 project: plan.project,
1522 limit,
1523 offset,
1524 group_size_hints,
1525 }
1526 }
1527 _ => {
1528 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1529 let ecx = &ExprContext {
1530 qcx,
1531 name: "ORDER BY clause of a set expression",
1532 scope: &scope,
1533 relation_type: &qcx.relation_type(&expr),
1534 allow_aggregates: false,
1535 allow_subqueries: true,
1536 allow_parameters: true,
1537 allow_windows: false,
1538 };
1539 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1540 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1541 let project = (0..ecx.relation_type.arity()).collect();
1542 PlannedQuery {
1543 expr: expr.map(map_exprs),
1544 scope,
1545 order_by,
1546 limit,
1547 project,
1548 offset,
1549 group_size_hints: GroupSizeHints::default(),
1550 }
1551 }
1552 };
1553
1554 match &q.ctes {
1556 CteBlock::Simple(_) => {
1557 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1558 if let Some(cte) = qcx.ctes.remove(&id) {
1559 planned_query.expr = HirRelationExpr::Let {
1560 name: cte.name,
1561 id: id.clone(),
1562 value: Box::new(value),
1563 body: Box::new(planned_query.expr),
1564 };
1565 }
1566 if let Some(shadowed_val) = shadowed_val {
1567 qcx.ctes.insert(id, shadowed_val);
1568 }
1569 }
1570 }
1571 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1572 let MutRecBlockOptionExtracted {
1573 recursion_limit,
1574 return_at_recursion_limit,
1575 error_at_recursion_limit,
1576 seen: _,
1577 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1578 let limit = match (
1579 recursion_limit,
1580 return_at_recursion_limit,
1581 error_at_recursion_limit,
1582 ) {
1583 (None, None, None) => None,
1584 (Some(max_iters), None, None) => {
1585 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1586 }
1587 (None, Some(max_iters), None) => Some((max_iters, true)),
1588 (None, None, Some(max_iters)) => Some((max_iters, false)),
1589 _ => {
1590 return Err(InvalidWmrRecursionLimit(
1591 "More than one recursion limit given. \
1592 Please give at most one of RECURSION LIMIT, \
1593 ERROR AT RECURSION LIMIT, \
1594 RETURN AT RECURSION LIMIT."
1595 .to_owned(),
1596 ));
1597 }
1598 }
1599 .try_map(|(max_iters, return_at_limit)| {
1600 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1601 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1602 "Recursion limit has to be greater than 0.".to_owned(),
1603 ))?,
1604 return_at_limit: *return_at_limit,
1605 })
1606 })?;
1607
1608 let mut bindings = Vec::new();
1609 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1610 if let Some(cte) = qcx.ctes.remove(&id) {
1611 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1612 }
1613 if let Some(shadowed_val) = shadowed_val {
1614 qcx.ctes.insert(id, shadowed_val);
1615 }
1616 }
1617 if !bindings.is_empty() {
1618 planned_query.expr = HirRelationExpr::LetRec {
1619 limit,
1620 bindings,
1621 body: Box::new(planned_query.expr),
1622 }
1623 }
1624 }
1625 }
1626
1627 Ok(planned_query)
1628}
1629
1630pub(crate) fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1632 let offset = offset
1633 .try_into_literal_int64()
1634 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1635 if offset < 0 {
1636 return Err(negative_offset_error(offset));
1637 }
1638 Ok(offset)
1639}
1640
1641pub(crate) fn negative_offset_error(offset: i64) -> PlanError {
1642 PlanError::InvalidOffset(format!("must not be negative, got {}", offset))
1643}
1644
1645generate_extracted_config!(
1646 MutRecBlockOption,
1647 (RecursionLimit, u64),
1648 (ReturnAtRecursionLimit, u64),
1649 (ErrorAtRecursionLimit, u64)
1650);
1651
1652pub fn plan_ctes(
1657 qcx: &mut QueryContext,
1658 q: &Query<Aug>,
1659) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1660 let mut result = Vec::new();
1662 let mut shadowed_descs = BTreeMap::new();
1665
1666 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1668 sql_bail!(
1669 "WITH query name {} specified more than once",
1670 normalize::ident_ref(ident).quoted()
1671 )
1672 }
1673
1674 match &q.ctes {
1675 CteBlock::Simple(ctes) => {
1676 for cte in ctes.iter() {
1678 let cte_name = normalize::ident(cte.alias.name.clone());
1679 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1680 let typ = qcx.relation_type(&val);
1681 let mut desc = RelationDesc::new(typ, scope.column_names());
1682 plan_utils::maybe_rename_columns(
1683 format!("CTE {}", cte.alias.name),
1684 &mut desc,
1685 &cte.alias.columns,
1686 )?;
1687 let shadowed = qcx.ctes.insert(
1689 cte.id,
1690 CteDesc {
1691 name: cte_name,
1692 desc,
1693 },
1694 );
1695
1696 result.push((cte.id, val, shadowed));
1697 }
1698 }
1699 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1700 for cte in ctes.iter() {
1702 let cte_name = normalize::ident(cte.name.clone());
1703 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1704 for column in cte.columns.iter() {
1705 desc_columns.push((
1706 normalize::column_name(column.name.clone()),
1707 SqlColumnType {
1708 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1709 nullable: true,
1710 },
1711 ));
1712 }
1713 let desc = RelationDesc::from_names_and_types(desc_columns);
1714 let shadowed = qcx.ctes.insert(
1715 cte.id,
1716 CteDesc {
1717 name: cte_name,
1718 desc,
1719 },
1720 );
1721 if let Some(shadowed) = shadowed {
1723 shadowed_descs.insert(cte.id, shadowed);
1724 }
1725 }
1726
1727 for cte in ctes.iter() {
1729 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1730
1731 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1732
1733 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1734 sql_bail!(
1737 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1738 );
1739 }
1740
1741 if !proposed_typ.keys.is_empty() {
1742 sql_bail!("[internal error]: WMR CTEs do not support keys");
1745 }
1746
1747 let derived_typ = qcx.relation_type(&val);
1749
1750 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1751 let cte_name = normalize::ident(cte.name.clone());
1752 let proposed_typ = proposed_typ
1753 .column_types
1754 .iter()
1755 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1756 .collect::<Vec<_>>();
1757 let inferred_typ = derived_typ
1758 .column_types
1759 .iter()
1760 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1761 .collect::<Vec<_>>();
1762 Err(PlanError::RecursiveTypeMismatch(
1763 cte_name,
1764 proposed_typ,
1765 inferred_typ,
1766 ))
1767 };
1768
1769 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1770 return type_err(proposed_typ, derived_typ);
1771 }
1772
1773 let val = match cast_relation(
1775 qcx,
1776 CastContext::Assignment,
1781 val,
1782 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1783 ) {
1784 Ok(val) => val,
1785 Err(_) => return type_err(proposed_typ, derived_typ),
1786 };
1787
1788 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1789 }
1790 }
1791 }
1792
1793 Ok(result)
1794}
1795
1796pub fn plan_nested_query(
1797 qcx: &mut QueryContext,
1798 q: &Query<Aug>,
1799) -> Result<(HirRelationExpr, Scope), PlanError> {
1800 let PlannedQuery {
1801 mut expr,
1802 scope,
1803 order_by,
1804 limit,
1805 offset,
1806 project,
1807 group_size_hints,
1808 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1809 if limit.is_some()
1810 || !offset
1811 .clone()
1812 .try_into_literal_int64()
1813 .is_ok_and(|offset| offset == 0)
1814 {
1815 expr = HirRelationExpr::top_k(
1816 expr,
1817 vec![],
1818 order_by,
1819 limit,
1820 offset,
1821 group_size_hints.limit_input_group_size,
1822 );
1823 }
1824 Ok((expr.project(project), scope))
1825}
1826
1827fn plan_set_expr(
1828 qcx: &mut QueryContext,
1829 q: &SetExpr<Aug>,
1830) -> Result<(HirRelationExpr, Scope), PlanError> {
1831 match q {
1832 SetExpr::Select(select) => {
1833 let order_by_exprs = Vec::new();
1834 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1835 assert!(plan.order_by.is_empty());
1838 Ok((plan.expr.project(plan.project), plan.scope))
1839 }
1840 SetExpr::SetOperation {
1841 op,
1842 all,
1843 left,
1844 right,
1845 } => {
1846 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1848 let (right_expr, right_scope) =
1849 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1850
1851 let left_type = qcx.relation_type(&left_expr);
1853 let right_type = qcx.relation_type(&right_expr);
1854 if left_type.arity() != right_type.arity() {
1855 sql_bail!(
1856 "each {} query must have the same number of columns: {} vs {}",
1857 op,
1858 left_type.arity(),
1859 right_type.arity(),
1860 );
1861 }
1862
1863 let left_ecx = &ExprContext {
1868 qcx,
1869 name: &op.to_string(),
1870 scope: &left_scope,
1871 relation_type: &left_type,
1872 allow_aggregates: false,
1873 allow_subqueries: false,
1874 allow_parameters: false,
1875 allow_windows: false,
1876 };
1877 let right_ecx = &ExprContext {
1878 qcx,
1879 name: &op.to_string(),
1880 scope: &right_scope,
1881 relation_type: &right_type,
1882 allow_aggregates: false,
1883 allow_subqueries: false,
1884 allow_parameters: false,
1885 allow_windows: false,
1886 };
1887 let mut left_casts = vec![];
1888 let mut right_casts = vec![];
1889 for (i, (left_type, right_type)) in left_type
1890 .column_types
1891 .iter()
1892 .zip_eq(right_type.column_types.iter())
1893 .enumerate()
1894 {
1895 let types = &[
1896 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1897 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1898 ];
1899 let target =
1900 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1901 match typeconv::plan_cast(
1902 left_ecx,
1903 CastContext::Implicit,
1904 HirScalarExpr::column(i),
1905 &target,
1906 ) {
1907 Ok(expr) => left_casts.push(expr),
1908 Err(_) => sql_bail!(
1909 "{} types {} and {} cannot be matched",
1910 op,
1911 qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1912 qcx.humanize_sql_scalar_type(&target, false),
1913 ),
1914 }
1915 match typeconv::plan_cast(
1916 right_ecx,
1917 CastContext::Implicit,
1918 HirScalarExpr::column(i),
1919 &target,
1920 ) {
1921 Ok(expr) => right_casts.push(expr),
1922 Err(_) => sql_bail!(
1923 "{} types {} and {} cannot be matched",
1924 op,
1925 qcx.humanize_sql_scalar_type(&target, false),
1926 qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1927 ),
1928 }
1929 }
1930 let lhs = if left_casts
1931 .iter()
1932 .enumerate()
1933 .any(|(i, e)| e != &HirScalarExpr::column(i))
1934 {
1935 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1936 left_expr.map(left_casts).project(project_key)
1937 } else {
1938 left_expr
1939 };
1940 let rhs = if right_casts
1941 .iter()
1942 .enumerate()
1943 .any(|(i, e)| e != &HirScalarExpr::column(i))
1944 {
1945 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1946 right_expr.map(right_casts).project(project_key)
1947 } else {
1948 right_expr
1949 };
1950
1951 let relation_expr = match op {
1952 SetOperator::Union => {
1953 if *all {
1954 lhs.union(rhs)
1955 } else {
1956 lhs.union(rhs).distinct()
1957 }
1958 }
1959 SetOperator::Except => Hir::except(all, lhs, rhs),
1960 SetOperator::Intersect => {
1961 let left_clone = lhs.clone();
1967 if *all {
1968 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1969 } else {
1970 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1971 .distinct()
1972 }
1973 }
1974 };
1975 let scope = Scope::from_source(
1976 None,
1977 left_scope.column_names(),
1979 );
1980
1981 Ok((relation_expr, scope))
1982 }
1983 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1984 SetExpr::Table(name) => {
1985 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1986 Ok((expr, scope))
1987 }
1988 SetExpr::Query(query) => {
1989 let (expr, scope) = plan_nested_query(qcx, query)?;
1990 Ok((expr, scope))
1991 }
1992 SetExpr::Show(stmt) => {
1993 if !qcx.lifetime.allow_show() {
2007 return Err(PlanError::ShowCommandInView);
2008 }
2009
2010 fn to_hirscope(
2013 plan: ShowCreatePlan,
2014 desc: StatementDesc,
2015 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2016 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2017 let desc = desc.relation_desc.ok_or_else(|| {
2018 internal_err!("statement description missing relation descriptor")
2019 })?;
2020 let scope = Scope::from_source(None, desc.iter_names());
2021 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2022 Ok((expr, scope))
2023 }
2024
2025 match stmt.clone() {
2026 ShowStatement::ShowColumns(stmt) => {
2027 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2028 }
2029 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2030 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2031 show::describe_show_create_connection(qcx.scx, stmt)?,
2032 ),
2033 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2034 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2035 show::describe_show_create_cluster(qcx.scx, stmt)?,
2036 ),
2037 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2038 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2039 show::describe_show_create_index(qcx.scx, stmt)?,
2040 ),
2041 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2042 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2043 show::describe_show_create_sink(qcx.scx, stmt)?,
2044 ),
2045 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2046 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_source(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2050 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_table(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2054 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_view(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2058 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2062 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_type(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowObjects(stmt) => {
2066 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2067 }
2068 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2069 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2070 }
2071 }
2072 }
2073}
2074
2075fn plan_values(
2077 qcx: &QueryContext,
2078 values: &[Vec<Expr<Aug>>],
2079) -> Result<(HirRelationExpr, Scope), PlanError> {
2080 assert!(!values.is_empty());
2081
2082 let ecx = &ExprContext {
2083 qcx,
2084 name: "VALUES",
2085 scope: &Scope::empty(),
2086 relation_type: &SqlRelationType::empty(),
2087 allow_aggregates: false,
2088 allow_subqueries: true,
2089 allow_parameters: true,
2090 allow_windows: false,
2091 };
2092
2093 let ncols = values[0].len();
2094 let nrows = values.len();
2095
2096 let mut cols = vec![vec![]; ncols];
2099 for row in values {
2100 if row.len() != ncols {
2101 sql_bail!(
2102 "VALUES expression has varying number of columns: {} vs {}",
2103 row.len(),
2104 ncols
2105 );
2106 }
2107 for (i, v) in row.iter().enumerate() {
2108 cols[i].push(v);
2109 }
2110 }
2111
2112 let mut col_iters = Vec::with_capacity(ncols);
2114 let mut col_types = Vec::with_capacity(ncols);
2115 for col in &cols {
2116 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2117 let mut col_type = ecx.column_type(&col[0]);
2118 for val in &col[1..] {
2119 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2121 col_types.push(col_type);
2122 col_iters.push(col.into_iter());
2123 }
2124
2125 let mut exprs = vec![];
2127 for _ in 0..nrows {
2128 for i in 0..ncols {
2129 exprs.push(col_iters[i].next().unwrap());
2130 }
2131 }
2132 let out = HirRelationExpr::CallTable {
2133 func: TableFunc::Wrap {
2134 width: ncols,
2135 types: col_types,
2136 },
2137 exprs,
2138 };
2139
2140 let mut scope = Scope::empty();
2142 for i in 0..ncols {
2143 let name = format!("column{}", i + 1);
2144 scope.items.push(ScopeItem::from_column_name(name));
2145 }
2146
2147 Ok((out, scope))
2148}
2149
2150fn plan_values_insert(
2160 qcx: &QueryContext,
2161 target_names: &[&ColumnName],
2162 target_types: &[&SqlScalarType],
2163 values: &[Vec<Expr<Aug>>],
2164) -> Result<HirRelationExpr, PlanError> {
2165 assert!(!values.is_empty());
2166
2167 if !values.iter().map(|row| row.len()).all_equal() {
2168 sql_bail!("VALUES lists must all be the same length");
2169 }
2170
2171 let ecx = &ExprContext {
2172 qcx,
2173 name: "VALUES",
2174 scope: &Scope::empty(),
2175 relation_type: &SqlRelationType::empty(),
2176 allow_aggregates: false,
2177 allow_subqueries: true,
2178 allow_parameters: true,
2179 allow_windows: false,
2180 };
2181
2182 let mut exprs = vec![];
2183 let mut types = vec![];
2184 for row in values {
2185 if row.len() > target_names.len() {
2186 sql_bail!("INSERT has more expressions than target columns");
2187 }
2188 for (column, val) in row.into_iter().enumerate() {
2189 let target_type = &target_types[column];
2190 let val = plan_expr(ecx, val)?;
2191 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2192 let source_type = &ecx.scalar_type(&val);
2193 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2194 Ok(val) => val,
2195 Err(_) => sql_bail!(
2196 "column {} is of type {} but expression is of type {}",
2197 target_names[column].quoted(),
2198 qcx.humanize_sql_scalar_type(target_type, false),
2199 qcx.humanize_sql_scalar_type(source_type, false),
2200 ),
2201 };
2202 if column >= types.len() {
2203 types.push(ecx.column_type(&val));
2204 } else {
2205 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2207 exprs.push(val);
2208 }
2209 }
2210
2211 Ok(HirRelationExpr::CallTable {
2212 func: TableFunc::Wrap {
2213 width: values[0].len(),
2214 types,
2215 },
2216 exprs,
2217 })
2218}
2219
2220fn plan_join_identity() -> (HirRelationExpr, Scope) {
2221 let typ = SqlRelationType::new(vec![]);
2222 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2223 let scope = Scope::empty();
2224 (expr, scope)
2225}
2226
2227#[derive(Debug)]
2233struct SelectPlan {
2234 expr: HirRelationExpr,
2235 scope: Scope,
2236 order_by: Vec<ColumnOrder>,
2237 project: Vec<usize>,
2238}
2239
2240generate_extracted_config!(
2241 SelectOption,
2242 (ExpectedGroupSize, u64),
2243 (AggregateInputGroupSize, u64),
2244 (DistinctOnInputGroupSize, u64),
2245 (LimitInputGroupSize, u64)
2246);
2247
2248fn plan_select_from_where(
2266 qcx: &QueryContext,
2267 mut s: Select<Aug>,
2268 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2269) -> Result<SelectPlan, PlanError> {
2270 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2277 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2278
2279 let (mut relation_expr, mut from_scope) =
2281 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2282 let (left, left_scope) = l;
2283 plan_join(
2284 qcx,
2285 left,
2286 left_scope,
2287 &Join {
2288 relation: TableFactor::NestedJoin {
2289 join: Box::new(twj.clone()),
2290 alias: None,
2291 },
2292 join_operator: JoinOperator::CrossJoin,
2293 },
2294 )
2295 })?;
2296
2297 if let Some(selection) = &s.selection {
2299 let ecx = &ExprContext {
2300 qcx,
2301 name: "WHERE clause",
2302 scope: &from_scope,
2303 relation_type: &qcx.relation_type(&relation_expr),
2304 allow_aggregates: false,
2305 allow_subqueries: true,
2306 allow_parameters: true,
2307 allow_windows: false,
2308 };
2309 let expr = plan_expr(ecx, selection)
2310 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2311 .type_as(ecx, &SqlScalarType::Bool)?;
2312 relation_expr = relation_expr.filter(vec![expr]);
2313 }
2314
2315 let (aggregates, table_funcs) = {
2318 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2319 visitor.visit_select_mut(&mut s);
2320 for o in order_by_exprs.iter_mut() {
2321 visitor.visit_order_by_expr_mut(o);
2322 }
2323 visitor.into_result()?
2324 };
2325 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2326 if !table_funcs.is_empty() {
2327 let (expr, scope) = plan_scalar_table_funcs(
2328 qcx,
2329 table_funcs,
2330 &mut table_func_names,
2331 &relation_expr,
2332 &from_scope,
2333 )?;
2334 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2335 from_scope = from_scope.product(scope)?;
2336 }
2337
2338 let projection = {
2340 let ecx = &ExprContext {
2341 qcx,
2342 name: "SELECT clause",
2343 scope: &from_scope,
2344 relation_type: &qcx.relation_type(&relation_expr),
2345 allow_aggregates: true,
2346 allow_subqueries: true,
2347 allow_parameters: true,
2348 allow_windows: true,
2349 };
2350 let mut out = vec![];
2351 for si in &s.projection {
2352 if *si == SelectItem::Wildcard && s.from.is_empty() {
2353 sql_bail!("SELECT * with no tables specified is not valid");
2354 }
2355 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2356 }
2357 out
2358 };
2359
2360 let (mut group_scope, select_all_mapping) = {
2364 let ecx = &ExprContext {
2366 qcx,
2367 name: "GROUP BY clause",
2368 scope: &from_scope,
2369 relation_type: &qcx.relation_type(&relation_expr),
2370 allow_aggregates: false,
2371 allow_subqueries: true,
2372 allow_parameters: true,
2373 allow_windows: false,
2374 };
2375 let mut group_key = vec![];
2376 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2377 let mut group_hir_exprs = vec![];
2378 let mut group_scope = Scope::empty();
2379 let mut select_all_mapping = BTreeMap::new();
2380
2381 for group_expr in &s.group_by {
2382 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2383 let new_column = group_key.len();
2384
2385 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2396 if let Some(group_expr) = group_expr {
2400 existing_scope_item.exprs.insert(group_expr.clone());
2401 }
2402 continue;
2403 }
2404
2405 let mut scope_item = if let HirScalarExpr::Column(
2406 ColumnRef {
2407 level: 0,
2408 column: old_column,
2409 },
2410 _name,
2411 ) = &expr
2412 {
2413 select_all_mapping.insert(*old_column, new_column);
2419 let scope_item = ecx.scope.items[*old_column].clone();
2420 scope_item
2421 } else {
2422 ScopeItem::empty()
2423 };
2424
2425 if let Some(group_expr) = group_expr.cloned() {
2426 scope_item.exprs.insert(group_expr);
2427 }
2428
2429 group_key.push(from_scope.len() + group_exprs.len());
2430 group_hir_exprs.push(expr.clone());
2431 group_exprs.insert(expr, scope_item);
2432 }
2433
2434 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2435 for expr in &group_hir_exprs {
2436 if let Some(scope_item) = group_exprs.remove(expr) {
2437 group_scope.items.push(scope_item);
2438 }
2439 }
2440
2441 let ecx = &ExprContext {
2443 qcx,
2444 name: "aggregate function",
2445 scope: &from_scope,
2446 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2447 allow_aggregates: false,
2448 allow_subqueries: true,
2449 allow_parameters: true,
2450 allow_windows: false,
2451 };
2452 let mut agg_exprs = vec![];
2453 for sql_function in aggregates {
2454 if sql_function.over.is_some() {
2455 unreachable!(
2456 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2457 );
2458 }
2459 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2460 group_scope
2461 .items
2462 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2463 }
2464 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2465 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2467 group_key,
2468 agg_exprs,
2469 group_size_hints.aggregate_input_group_size,
2470 );
2471
2472 for i in 0..from_scope.len() {
2478 if !select_all_mapping.contains_key(&i) {
2479 let scope_item = &ecx.scope.items[i];
2480 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2481 table_name: scope_item.table_name.clone(),
2482 column_name: scope_item.column_name.clone(),
2483 allow_unqualified_references: scope_item.allow_unqualified_references,
2484 });
2485 }
2486 }
2487
2488 (group_scope, select_all_mapping)
2489 } else {
2490 (
2492 from_scope.clone(),
2493 (0..from_scope.len()).map(|i| (i, i)).collect(),
2494 )
2495 }
2496 };
2497
2498 if let Some(ref having) = s.having {
2500 let ecx = &ExprContext {
2501 qcx,
2502 name: "HAVING clause",
2503 scope: &group_scope,
2504 relation_type: &qcx.relation_type(&relation_expr),
2505 allow_aggregates: true,
2506 allow_subqueries: true,
2507 allow_parameters: true,
2508 allow_windows: false,
2509 };
2510 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2511 relation_expr = relation_expr.filter(vec![expr]);
2512 }
2513
2514 let window_funcs = {
2527 let mut visitor = WindowFuncCollector::default();
2528 visitor.visit_select(&s);
2532 for o in order_by_exprs.iter() {
2533 visitor.visit_order_by_expr(o);
2534 }
2535 visitor.into_result()
2536 };
2537 for window_func in window_funcs {
2538 let ecx = &ExprContext {
2539 qcx,
2540 name: "window function",
2541 scope: &group_scope,
2542 relation_type: &qcx.relation_type(&relation_expr),
2543 allow_aggregates: true,
2544 allow_subqueries: true,
2545 allow_parameters: true,
2546 allow_windows: true,
2547 };
2548 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2549 group_scope.items.push(ScopeItem::from_expr(window_func));
2550 }
2551 if let Some(ref qualify) = s.qualify {
2558 let ecx = &ExprContext {
2559 qcx,
2560 name: "QUALIFY clause",
2561 scope: &group_scope,
2562 relation_type: &qcx.relation_type(&relation_expr),
2563 allow_aggregates: true,
2564 allow_subqueries: true,
2565 allow_parameters: true,
2566 allow_windows: true,
2567 };
2568 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2569 relation_expr = relation_expr.filter(vec![expr]);
2570 }
2571
2572 let output_columns = {
2574 let mut new_exprs = vec![];
2575 let mut new_type = qcx.relation_type(&relation_expr);
2576 let mut output_columns = vec![];
2577 for (select_item, column_name) in &projection {
2578 let ecx = &ExprContext {
2579 qcx,
2580 name: "SELECT clause",
2581 scope: &group_scope,
2582 relation_type: &new_type,
2583 allow_aggregates: true,
2584 allow_subqueries: true,
2585 allow_parameters: true,
2586 allow_windows: true,
2587 };
2588 let expr = match select_item {
2589 ExpandedSelectItem::InputOrdinal(i) => {
2590 if let Some(column) = select_all_mapping.get(i).copied() {
2591 HirScalarExpr::column(column)
2592 } else {
2593 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2594 }
2595 }
2596 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2597 };
2598 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2599 output_columns.push((column, column_name));
2601 } else {
2602 let typ = ecx.column_type(&expr);
2609 new_type.column_types.push(typ);
2610 new_exprs.push(expr);
2611 output_columns.push((group_scope.len(), column_name));
2612 group_scope
2613 .items
2614 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2615 }
2616 }
2617 relation_expr = relation_expr.map(new_exprs);
2618 output_columns
2619 };
2620 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2621
2622 let order_by = {
2624 let relation_type = qcx.relation_type(&relation_expr);
2625 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2626 &ExprContext {
2627 qcx,
2628 name: "ORDER BY clause",
2629 scope: &group_scope,
2630 relation_type: &relation_type,
2631 allow_aggregates: true,
2632 allow_subqueries: true,
2633 allow_parameters: true,
2634 allow_windows: true,
2635 },
2636 &order_by_exprs,
2637 &output_columns,
2638 )?;
2639
2640 match s.distinct {
2641 None => relation_expr = relation_expr.map(map_exprs),
2642 Some(Distinct::EntireRow) => {
2643 if relation_type.arity() == 0 {
2644 sql_bail!("SELECT DISTINCT must have at least one column");
2645 }
2646 if !try_push_projection_order_by(
2650 &mut relation_expr,
2651 &mut project_key,
2652 &mut order_by,
2653 ) {
2654 sql_bail!(
2655 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2656 );
2657 }
2658 assert!(map_exprs.is_empty());
2659 relation_expr = relation_expr.distinct();
2660 }
2661 Some(Distinct::On(exprs)) => {
2662 let ecx = &ExprContext {
2663 qcx,
2664 name: "DISTINCT ON clause",
2665 scope: &group_scope,
2666 relation_type: &qcx.relation_type(&relation_expr),
2667 allow_aggregates: true,
2668 allow_subqueries: true,
2669 allow_parameters: true,
2670 allow_windows: true,
2671 };
2672
2673 let mut distinct_exprs = vec![];
2674 for expr in &exprs {
2675 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2676 distinct_exprs.push(expr);
2677 }
2678
2679 let mut distinct_key = vec![];
2680
2681 let arity = relation_type.arity();
2691 for ord in order_by.iter().take(distinct_exprs.len()) {
2692 let mut expr = &HirScalarExpr::column(ord.column);
2695 if ord.column >= arity {
2696 expr = &map_exprs[ord.column - arity];
2697 };
2698 match distinct_exprs.iter().position(move |e| e == expr) {
2699 None => sql_bail!(
2700 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2701 ),
2702 Some(pos) => {
2703 distinct_exprs.remove(pos);
2704 }
2705 }
2706 distinct_key.push(ord.column);
2707 }
2708
2709 for expr in distinct_exprs {
2711 let column = match expr {
2714 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2715 _ => {
2716 map_exprs.push(expr);
2717 arity + map_exprs.len() - 1
2718 }
2719 };
2720 distinct_key.push(column);
2721 }
2722
2723 let distinct_len = distinct_key.len();
2728 relation_expr = HirRelationExpr::top_k(
2729 relation_expr.map(map_exprs),
2730 distinct_key,
2731 order_by.iter().skip(distinct_len).cloned().collect(),
2732 Some(HirScalarExpr::literal(
2733 Datum::Int64(1),
2734 SqlScalarType::Int64,
2735 )),
2736 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2737 group_size_hints.distinct_on_input_group_size,
2738 );
2739 }
2740 }
2741
2742 order_by
2743 };
2744
2745 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2750
2751 Ok(SelectPlan {
2752 expr: relation_expr,
2753 scope,
2754 order_by,
2755 project: project_key,
2756 })
2757}
2758
2759fn plan_scalar_table_funcs(
2760 qcx: &QueryContext,
2761 table_funcs: BTreeMap<Function<Aug>, String>,
2762 table_func_names: &mut BTreeMap<String, Ident>,
2763 relation_expr: &HirRelationExpr,
2764 from_scope: &Scope,
2765) -> Result<(HirRelationExpr, Scope), PlanError> {
2766 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2767
2768 for (table_func, id) in table_funcs.iter() {
2769 table_func_names.insert(
2770 id.clone(),
2771 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2773 );
2774 }
2775 if table_funcs.len() == 1 {
2778 let (table_func, id) = table_funcs.iter().next().unwrap();
2779 let (expr, mut scope) =
2780 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2781
2782 let num_cols = scope.len();
2784 for i in 0..scope.len() {
2785 scope.items[i].table_name = Some(PartialItemName {
2786 database: None,
2787 schema: None,
2788 item: id.clone(),
2789 });
2790 scope.items[i].from_single_column_function = num_cols == 1;
2791 scope.items[i].allow_unqualified_references = false;
2792 }
2793 return Ok((expr, scope));
2794 }
2795 if table_funcs.keys().any(is_repeat_row) {
2796 bail_unsupported!(format!(
2799 "{} in a SELECT clause with multiple table functions",
2800 REPEAT_ROW_NAME
2801 ));
2802 }
2803 let (expr, mut scope, num_cols) =
2805 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2806
2807 let mut i = 0;
2809 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2810 for _ in 0..num_cols {
2811 scope.items[i].table_name = Some(PartialItemName {
2812 database: None,
2813 schema: None,
2814 item: id.clone(),
2815 });
2816 scope.items[i].from_single_column_function = num_cols == 1;
2817 scope.items[i].allow_unqualified_references = false;
2818 i += 1;
2819 }
2820 scope.items[i].table_name = Some(PartialItemName {
2824 database: None,
2825 schema: None,
2826 item: id.clone(),
2827 });
2828 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2829 scope.items[i].allow_unqualified_references = false;
2830 i += 1;
2831 }
2832 scope.items[i].allow_unqualified_references = false;
2834 Ok((expr, scope))
2835}
2836
2837fn plan_group_by_expr<'a>(
2844 ecx: &ExprContext,
2845 group_expr: &'a Expr<Aug>,
2846 projection: &'a [(ExpandedSelectItem, ColumnName)],
2847) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2848 let plan_projection = |column: usize| match &projection[column].0 {
2849 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2850 ExpandedSelectItem::Expr(expr) => {
2851 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2852 }
2853 };
2854
2855 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2858 return plan_projection(column);
2859 }
2860
2861 match group_expr {
2865 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2866 Err(PlanError::UnknownColumn {
2867 table: None,
2868 column,
2869 similar,
2870 }) => {
2871 let mut iter = projection.iter().map(|(_expr, name)| name);
2874 if let Some(i) = iter.position(|n| *n == column) {
2875 if iter.any(|n| *n == column) {
2876 Err(PlanError::AmbiguousColumn(column))
2877 } else {
2878 plan_projection(i)
2879 }
2880 } else {
2881 Err(PlanError::UnknownColumn {
2884 table: None,
2885 column,
2886 similar,
2887 })
2888 }
2889 }
2890 res => Ok((Some(group_expr), res?)),
2891 },
2892 _ => Ok((
2893 Some(group_expr),
2894 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2895 )),
2896 }
2897}
2898
2899pub(crate) fn plan_order_by_exprs(
2907 ecx: &ExprContext,
2908 order_by_exprs: &[OrderByExpr<Aug>],
2909 output_columns: &[(usize, &ColumnName)],
2910) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2911 let mut order_by = vec![];
2912 let mut map_exprs = vec![];
2913 for obe in order_by_exprs {
2914 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2915 let column = match expr {
2918 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2919 _ => {
2920 map_exprs.push(expr);
2921 ecx.relation_type.arity() + map_exprs.len() - 1
2922 }
2923 };
2924 order_by.push(resolve_desc_and_nulls_last(obe, column));
2925 }
2926 Ok((order_by, map_exprs))
2927}
2928
2929fn plan_order_by_or_distinct_expr(
2947 ecx: &ExprContext,
2948 expr: &Expr<Aug>,
2949 output_columns: &[(usize, &ColumnName)],
2950) -> Result<HirScalarExpr, PlanError> {
2951 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2952 return Ok(HirScalarExpr::column(output_columns[i].0));
2953 }
2954
2955 if let Expr::Identifier(names) = expr {
2956 if let [name] = &names[..] {
2957 let name = normalize::column_name(name.clone());
2958 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2959 if let Some((i, _)) = iter.next() {
2960 match iter.next() {
2961 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2965 _ => return Ok(HirScalarExpr::column(*i)),
2966 }
2967 }
2968 }
2969 }
2970
2971 plan_expr(ecx, expr)?.type_as_any(ecx)
2972}
2973
2974fn plan_table_with_joins(
2975 qcx: &QueryContext,
2976 table_with_joins: &TableWithJoins<Aug>,
2977) -> Result<(HirRelationExpr, Scope), PlanError> {
2978 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2979 for join in &table_with_joins.joins {
2980 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2981 expr = new_expr;
2982 scope = new_scope;
2983 }
2984 Ok((expr, scope))
2985}
2986
2987fn plan_table_factor(
2988 qcx: &QueryContext,
2989 table_factor: &TableFactor<Aug>,
2990) -> Result<(HirRelationExpr, Scope), PlanError> {
2991 match table_factor {
2992 TableFactor::Table { name, alias } => {
2993 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2994 let scope = plan_table_alias(scope, alias.as_ref())?;
2995 Ok((expr, scope))
2996 }
2997
2998 TableFactor::Function {
2999 function,
3000 alias,
3001 with_ordinality,
3002 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3003
3004 TableFactor::RowsFrom {
3005 functions,
3006 alias,
3007 with_ordinality,
3008 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3009
3010 TableFactor::Derived {
3011 lateral,
3012 subquery,
3013 alias,
3014 } => {
3015 let mut qcx = (*qcx).clone();
3016 if !lateral {
3017 for scope in &mut qcx.outer_scopes {
3021 if scope.lateral_barrier {
3022 break;
3023 }
3024 scope.items.clear();
3025 }
3026 }
3027 qcx.outer_scopes[0].lateral_barrier = true;
3028 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3029 let scope = plan_table_alias(scope, alias.as_ref())?;
3030 Ok((expr, scope))
3031 }
3032
3033 TableFactor::NestedJoin { join, alias } => {
3034 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3035 let scope = plan_table_alias(scope, alias.as_ref())?;
3036 Ok((expr, scope))
3037 }
3038 }
3039}
3040
3041fn plan_rows_from(
3083 qcx: &QueryContext,
3084 functions: &[Function<Aug>],
3085 alias: Option<&TableAlias>,
3086 with_ordinality: bool,
3087) -> Result<(HirRelationExpr, Scope), PlanError> {
3088 if functions.iter().any(is_repeat_row) {
3090 bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3094 }
3095
3096 if let [function] = functions {
3099 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3100 }
3101
3102 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3106 qcx,
3107 functions,
3108 Some(functions[0].name.full_item_name().clone()),
3109 )?;
3110
3111 let mut columns = Vec::new();
3113 let mut offset = 0;
3114 for (idx, cols) in num_cols.into_iter().enumerate() {
3116 for i in 0..cols {
3117 columns.push(offset + i);
3118 }
3119 offset += cols + 1;
3120
3121 scope.items.remove(offset - idx - 1);
3124 }
3125
3126 if with_ordinality {
3129 columns.push(offset);
3130 } else {
3131 scope.items.pop();
3132 }
3133
3134 let expr = expr.project(columns);
3135
3136 let scope = plan_table_alias(scope, alias)?;
3137 Ok((expr, scope))
3138}
3139
3140fn is_repeat_row(f: &Function<Aug>) -> bool {
3141 f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3142}
3143
3144fn plan_rows_from_internal<'a>(
3167 qcx: &QueryContext,
3168 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3169 table_name: Option<FullItemName>,
3170) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3171 let mut functions = functions.into_iter();
3172 let mut num_cols = Vec::new();
3173
3174 let (mut left_expr, mut left_scope) =
3178 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3179 num_cols.push(left_scope.len() - 1);
3180 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3182 left_scope
3183 .items
3184 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3185
3186 for function in functions {
3187 let qcx = qcx.empty_derived_context();
3189 let (right_expr, mut right_scope) =
3190 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3191 num_cols.push(right_scope.len() - 1);
3192 let left_col = left_scope.len() - 1;
3193 let right_col = left_scope.len() + right_scope.len() - 1;
3194 let on = HirScalarExpr::call_binary(
3195 HirScalarExpr::column(left_col),
3196 HirScalarExpr::column(right_col),
3197 expr_func::Eq,
3198 );
3199 left_expr = left_expr
3200 .join(right_expr, on, JoinKind::FullOuter)
3201 .map(vec![HirScalarExpr::call_variadic(
3202 Coalesce,
3203 vec![
3204 HirScalarExpr::column(left_col),
3205 HirScalarExpr::column(right_col),
3206 ],
3207 )]);
3208
3209 left_expr = left_expr.project(
3212 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3215 );
3216 right_scope.items.push(left_scope.items.pop().unwrap());
3218
3219 left_scope.items.extend(right_scope.items);
3220 }
3221
3222 Ok((left_expr, left_scope, num_cols))
3223}
3224
3225fn plan_solitary_table_function(
3229 qcx: &QueryContext,
3230 function: &Function<Aug>,
3231 alias: Option<&TableAlias>,
3232 with_ordinality: bool,
3233) -> Result<(HirRelationExpr, Scope), PlanError> {
3234 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3235
3236 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3237 if single_column_function {
3238 let item = &mut scope.items[0];
3239
3240 item.from_single_column_function = true;
3243
3244 if let Some(alias) = alias {
3259 if let ScopeItem {
3260 table_name: Some(table_name),
3261 column_name,
3262 ..
3263 } = item
3264 {
3265 if table_name.item.as_str() == column_name.as_str() {
3266 *column_name = normalize::column_name(alias.name.clone());
3267 }
3268 }
3269 }
3270 }
3271
3272 let scope = plan_table_alias(scope, alias)?;
3273 Ok((expr, scope))
3274}
3275
3276fn plan_table_function_internal(
3281 qcx: &QueryContext,
3282 Function {
3283 name,
3284 args,
3285 filter,
3286 over,
3287 distinct,
3288 }: &Function<Aug>,
3289 with_ordinality: bool,
3290 table_name: Option<FullItemName>,
3291) -> Result<(HirRelationExpr, Scope), PlanError> {
3292 if filter.is_some() {
3297 sql_bail!("FILTER is not allowed for table functions in FROM");
3298 }
3299 if over.is_some() {
3300 sql_bail!("OVER is not allowed for table functions in FROM");
3301 }
3302 if *distinct {
3303 sql_bail!("DISTINCT is not allowed for table functions in FROM");
3304 }
3305
3306 let ecx = &ExprContext {
3307 qcx,
3308 name: "table function arguments",
3309 scope: &Scope::empty(),
3310 relation_type: &SqlRelationType::empty(),
3311 allow_aggregates: false,
3312 allow_subqueries: true,
3313 allow_parameters: true,
3314 allow_windows: false,
3315 };
3316
3317 let scalar_args = match args {
3318 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3319 FunctionArgs::Args { args, order_by } => {
3320 if !order_by.is_empty() {
3321 sql_bail!(
3322 "ORDER BY specified, but {} is not an aggregate function",
3323 name
3324 );
3325 }
3326 plan_exprs(ecx, args)?
3327 }
3328 };
3329
3330 let table_name = match table_name {
3331 Some(table_name) => table_name.item,
3332 None => name.full_item_name().item.clone(),
3333 };
3334
3335 let scope_name = Some(PartialItemName {
3336 database: None,
3337 schema: None,
3338 item: table_name,
3339 });
3340
3341 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3342 Func::Table(impls) => {
3343 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3344 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3345 let expr = match tf.imp {
3346 TableFuncImpl::CallTable { mut func, exprs } => {
3347 if with_ordinality {
3348 func = TableFunc::with_ordinality(func.clone()).ok_or(
3349 PlanError::Unsupported {
3350 feature: format!("WITH ORDINALITY on {}", func),
3351 discussion_no: None,
3352 },
3353 )?;
3354 }
3355 HirRelationExpr::CallTable { func, exprs }
3356 }
3357 TableFuncImpl::Expr(expr) => {
3358 if !with_ordinality {
3359 expr
3360 } else {
3361 if qcx
3365 .scx
3366 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3367 {
3368 tracing::error!(
3372 %name,
3373 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3374 );
3375 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3376 func: WindowExprType::Scalar(ScalarWindowExpr {
3377 func: ScalarWindowFunc::RowNumber,
3378 order_by: vec![],
3379 }),
3380 partition_by: vec![],
3381 order_by: vec![],
3382 })])
3383 } else {
3384 bail_unsupported!(format!(
3385 "WITH ORDINALITY or ROWS FROM with {}",
3386 name
3387 ));
3388 }
3389 }
3390 }
3391 };
3392 (expr, scope)
3393 }
3394 Func::Scalar(impls) => {
3395 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3396 let output = expr.typ(
3397 &qcx.outer_relation_types,
3398 &SqlRelationType::new(vec![]),
3399 &qcx.scx.param_types.borrow(),
3400 );
3401
3402 let relation = SqlRelationType::new(vec![output]);
3403
3404 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3405 let column_name = normalize::column_name(function_ident);
3406 let name = column_name.to_string();
3407
3408 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3409
3410 let mut func = TableFunc::TabletizedScalar { relation, name };
3411 if with_ordinality {
3412 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3413 feature: format!("WITH ORDINALITY on {}", func),
3414 discussion_no: None,
3415 })?;
3416 }
3417 (
3418 HirRelationExpr::CallTable {
3419 func,
3420 exprs: vec![expr],
3421 },
3422 scope,
3423 )
3424 }
3425 o => sql_bail!(
3426 "{} functions are not supported in functions in FROM",
3427 o.class()
3428 ),
3429 };
3430
3431 if with_ordinality {
3432 scope
3433 .items
3434 .push(ScopeItem::from_name(scope_name, "ordinality"));
3435 }
3436
3437 Ok((expr, scope))
3438}
3439
3440fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3441 if let Some(TableAlias {
3442 name,
3443 columns,
3444 strict,
3445 }) = alias
3446 {
3447 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3448 sql_bail!(
3449 "{} has {} columns available but {} columns specified",
3450 name,
3451 scope.items.len(),
3452 columns.len()
3453 );
3454 }
3455
3456 let table_name = normalize::ident(name.to_owned());
3457 for (i, item) in scope.items.iter_mut().enumerate() {
3458 item.table_name = if item.allow_unqualified_references {
3459 Some(PartialItemName {
3460 database: None,
3461 schema: None,
3462 item: table_name.clone(),
3463 })
3464 } else {
3465 None
3499 };
3500 item.column_name = columns
3501 .get(i)
3502 .map(|a| normalize::column_name(a.clone()))
3503 .unwrap_or_else(|| item.column_name.clone());
3504 }
3505 }
3506 Ok(scope)
3507}
3508
3509fn invent_column_name(
3513 ecx: &ExprContext,
3514 expr: &Expr<Aug>,
3515 table_func_names: &BTreeMap<String, Ident>,
3516) -> Result<Option<ColumnName>, PlanError> {
3517 #[derive(Debug)]
3524 enum NameQuality {
3525 Low,
3526 High,
3527 }
3528
3529 fn invent(
3530 ecx: &ExprContext,
3531 expr: &Expr<Aug>,
3532 table_func_names: &BTreeMap<String, Ident>,
3533 ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3534 Ok(match expr {
3535 Expr::Identifier(names) => {
3536 if let [name] = names.as_slice() {
3537 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3538 return Ok(Some((
3539 normalize::column_name(table_func_name.clone()),
3540 NameQuality::High,
3541 )));
3542 }
3543 }
3544 names
3545 .last()
3546 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3547 }
3548 Expr::Value(v) => match v {
3549 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3552 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3553 _ => None,
3554 },
3555 Expr::Function(func) => {
3556 let (schema, item) = match &func.name {
3557 ResolvedItemName::Item {
3558 qualifiers,
3559 full_name,
3560 ..
3561 } => (&qualifiers.schema_spec, full_name.item.clone()),
3562 _ => {
3565 bail_internal!("function name did not resolve to an item: {:?}", func.name)
3566 }
3567 };
3568
3569 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3570 || schema
3571 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3572 {
3573 None
3574 } else {
3575 Some((item.into(), NameQuality::High))
3576 }
3577 }
3578 Expr::HomogenizingFunction { function, .. } => Some((
3579 function.to_string().to_lowercase().into(),
3580 NameQuality::High,
3581 )),
3582 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3583 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3584 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3585 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3586 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3587 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3588 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3589 },
3590 Expr::Case { else_result, .. } => {
3591 let inner = match else_result.as_ref() {
3592 Some(else_result) => invent(ecx, else_result, table_func_names)?,
3593 None => None,
3594 };
3595 match inner {
3596 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3597 _ => Some(("case".into(), NameQuality::Low)),
3598 }
3599 }
3600 Expr::FieldAccess { field, .. } => {
3601 Some((normalize::column_name(field.clone()), NameQuality::High))
3602 }
3603 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3604 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3605 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3606 let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3614 else {
3615 return Ok(None);
3616 };
3617 scope
3618 .items
3619 .first()
3620 .map(|name| (name.column_name.clone(), NameQuality::High))
3621 }
3622 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3623 _ => None,
3624 })
3625 }
3626
3627 Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3628}
3629
3630#[derive(Debug)]
3631enum ExpandedSelectItem<'a> {
3632 InputOrdinal(usize),
3633 Expr(Cow<'a, Expr<Aug>>),
3634}
3635
3636impl ExpandedSelectItem<'_> {
3637 fn as_expr(&self) -> Option<&Expr<Aug>> {
3638 match self {
3639 ExpandedSelectItem::InputOrdinal(_) => None,
3640 ExpandedSelectItem::Expr(expr) => Some(expr),
3641 }
3642 }
3643}
3644
3645fn expand_select_item<'a>(
3646 ecx: &ExprContext,
3647 s: &'a SelectItem<Aug>,
3648 table_func_names: &BTreeMap<String, Ident>,
3649) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3650 match s {
3651 SelectItem::Expr {
3652 expr: Expr::QualifiedWildcard(table_name),
3653 alias: _,
3654 } => {
3655 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3656 let table_name =
3657 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3658 let out: Vec<_> = ecx
3659 .scope
3660 .items
3661 .iter()
3662 .enumerate()
3663 .filter(|(_i, item)| item.is_from_table(&table_name))
3664 .map(|(i, item)| {
3665 let name = item.column_name.clone();
3666 (ExpandedSelectItem::InputOrdinal(i), name)
3667 })
3668 .collect();
3669 if out.is_empty() {
3670 sql_bail!("no table named '{}' in scope", table_name);
3671 }
3672 Ok(out)
3673 }
3674 SelectItem::Expr {
3675 expr: Expr::WildcardAccess(sql_expr),
3676 alias: _,
3677 } => {
3678 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3679 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3685 let fields = match ecx.scalar_type(&expr) {
3686 SqlScalarType::Record { fields, .. } => fields,
3687 ty => sql_bail!(
3688 "type {} is not composite",
3689 ecx.humanize_sql_scalar_type(&ty, false)
3690 ),
3691 };
3692 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3693 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3694 if let [name] = ident.as_slice() {
3695 if let Ok(items) = ecx.scope.items_from_table(
3696 &[],
3697 &PartialItemName {
3698 database: None,
3699 schema: None,
3700 item: name.as_str().to_string(),
3701 },
3702 ) {
3703 for (_, item) in items {
3704 if item
3705 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3706 {
3707 skip_cols.insert(item.column_name.clone());
3708 }
3709 }
3710 }
3711 }
3712 }
3713 let items = fields
3714 .iter()
3715 .filter_map(|(name, _ty)| {
3716 if skip_cols.contains(name) {
3717 None
3718 } else {
3719 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3720 expr: sql_expr.clone(),
3721 field: name.clone().into(),
3722 }));
3723 Some((item, name.clone()))
3724 }
3725 })
3726 .collect();
3727 Ok(items)
3728 }
3729 SelectItem::Wildcard => {
3730 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3731 let items: Vec<_> = ecx
3732 .scope
3733 .items
3734 .iter()
3735 .enumerate()
3736 .filter(|(_i, item)| item.allow_unqualified_references)
3737 .map(|(i, item)| {
3738 let name = item.column_name.clone();
3739 (ExpandedSelectItem::InputOrdinal(i), name)
3740 })
3741 .collect();
3742
3743 Ok(items)
3744 }
3745 SelectItem::Expr { expr, alias } => {
3746 let name = match alias.clone().map(normalize::column_name) {
3747 Some(name) => name,
3748 None => invent_column_name(ecx, expr, table_func_names)?
3749 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3750 };
3751 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3752 }
3753 }
3754}
3755
3756fn plan_join(
3757 left_qcx: &QueryContext,
3758 left: HirRelationExpr,
3759 left_scope: Scope,
3760 join: &Join<Aug>,
3761) -> Result<(HirRelationExpr, Scope), PlanError> {
3762 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3763 let (kind, constraint) = match &join.join_operator {
3764 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3765 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3766 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3767 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3768 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3769 };
3770
3771 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3772 if !kind.can_be_correlated() {
3773 for item in &mut right_qcx.outer_scopes[0].items {
3774 item.error_if_referenced =
3779 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3780 table: table.cloned(),
3781 column: column.clone(),
3782 });
3783 }
3784 }
3785 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3786
3787 let (expr, scope) = match constraint {
3788 JoinConstraint::On(expr) => {
3789 let product_scope = left_scope.product(right_scope)?;
3790 let ecx = &ExprContext {
3791 qcx: left_qcx,
3792 name: "ON clause",
3793 scope: &product_scope,
3794 relation_type: &SqlRelationType::new(
3795 left_qcx
3796 .relation_type(&left)
3797 .column_types
3798 .into_iter()
3799 .chain(right_qcx.relation_type(&right).column_types)
3800 .collect(),
3801 ),
3802 allow_aggregates: false,
3803 allow_subqueries: true,
3804 allow_parameters: true,
3805 allow_windows: false,
3806 };
3807 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3808 let joined = left.join(right, on, kind);
3809 (joined, product_scope)
3810 }
3811 JoinConstraint::Using { columns, alias } => {
3812 let column_names = columns
3813 .iter()
3814 .map(|ident| normalize::column_name(ident.clone()))
3815 .collect::<Vec<_>>();
3816
3817 plan_using_constraint(
3818 &column_names,
3819 left_qcx,
3820 left,
3821 left_scope,
3822 &right_qcx,
3823 right,
3824 right_scope,
3825 kind,
3826 alias.as_ref(),
3827 )?
3828 }
3829 JoinConstraint::Natural => {
3830 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3833 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3834 let left_column_names = left_scope.column_names();
3835 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3836 let column_names: Vec<_> = left_column_names
3837 .filter(|col| right_column_names.contains(col))
3838 .cloned()
3839 .collect();
3840 plan_using_constraint(
3841 &column_names,
3842 left_qcx,
3843 left,
3844 left_scope,
3845 &right_qcx,
3846 right,
3847 right_scope,
3848 kind,
3849 None,
3850 )?
3851 }
3852 };
3853 Ok((expr, scope))
3854}
3855
3856#[allow(clippy::too_many_arguments)]
3858fn plan_using_constraint(
3859 column_names: &[ColumnName],
3860 left_qcx: &QueryContext,
3861 left: HirRelationExpr,
3862 left_scope: Scope,
3863 right_qcx: &QueryContext,
3864 right: HirRelationExpr,
3865 right_scope: Scope,
3866 kind: JoinKind,
3867 alias: Option<&Ident>,
3868) -> Result<(HirRelationExpr, Scope), PlanError> {
3869 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3870
3871 let mut unique_column_names = BTreeSet::new();
3874 for c in column_names {
3875 if !unique_column_names.insert(c) {
3876 return Err(PlanError::Unsupported {
3877 feature: format!(
3878 "column name {} appears more than once in USING clause",
3879 c.quoted()
3880 ),
3881 discussion_no: None,
3882 });
3883 }
3884 }
3885
3886 let alias_item_name = alias.map(|alias| PartialItemName {
3887 database: None,
3888 schema: None,
3889 item: alias.clone().to_string(),
3890 });
3891
3892 if let Some(alias_item_name) = &alias_item_name {
3893 for partial_item_name in both_scope.table_names() {
3894 if partial_item_name.matches(alias_item_name) {
3895 sql_bail!(
3896 "table name \"{}\" specified more than once",
3897 alias_item_name
3898 )
3899 }
3900 }
3901 }
3902
3903 let ecx = &ExprContext {
3904 qcx: right_qcx,
3905 name: "USING clause",
3906 scope: &both_scope,
3907 relation_type: &SqlRelationType::new(
3908 left_qcx
3909 .relation_type(&left)
3910 .column_types
3911 .into_iter()
3912 .chain(right_qcx.relation_type(&right).column_types)
3913 .collect(),
3914 ),
3915 allow_aggregates: false,
3916 allow_subqueries: false,
3917 allow_parameters: false,
3918 allow_windows: false,
3919 };
3920
3921 let mut join_exprs = vec![];
3922 let mut map_exprs = vec![];
3923 let mut new_items = vec![];
3924 let mut join_cols = vec![];
3925 let mut hidden_cols = vec![];
3926
3927 for column_name in column_names {
3928 let (lhs, lhs_name) = left_scope.resolve_using_column(
3930 column_name,
3931 JoinSide::Left,
3932 &mut left_qcx.name_manager.borrow_mut(),
3933 )?;
3934 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3935 column_name,
3936 JoinSide::Right,
3937 &mut right_qcx.name_manager.borrow_mut(),
3938 )?;
3939
3940 rhs.column += left_scope.len();
3942
3943 let mut exprs = coerce_homogeneous_exprs(
3945 &ecx.with_name(&format!(
3946 "NATURAL/USING join column {}",
3947 column_name.quoted()
3948 )),
3949 vec![
3950 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3951 lhs,
3952 Arc::clone(&lhs_name),
3953 )),
3954 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3955 rhs,
3956 Arc::clone(&rhs_name),
3957 )),
3958 ],
3959 None,
3960 )?;
3961 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3962
3963 match kind {
3964 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3965 join_cols.push(lhs.column);
3966 hidden_cols.push(rhs.column);
3967 }
3968 JoinKind::RightOuter => {
3969 join_cols.push(rhs.column);
3970 hidden_cols.push(lhs.column);
3971 }
3972 JoinKind::FullOuter => {
3973 join_cols.push(both_scope.items.len() + map_exprs.len());
3976 hidden_cols.push(lhs.column);
3977 hidden_cols.push(rhs.column);
3978 map_exprs.push(HirScalarExpr::call_variadic(
3979 Coalesce,
3980 vec![expr1.clone(), expr2.clone()],
3981 ));
3982 new_items.push(ScopeItem::from_column_name(column_name));
3983 }
3984 }
3985
3986 if alias_item_name.is_some() {
3991 let new_item_col = both_scope.items.len() + new_items.len();
3992 join_cols.push(new_item_col);
3993 hidden_cols.push(new_item_col);
3994
3995 new_items.push(ScopeItem::from_name(
3996 alias_item_name.clone(),
3997 column_name.clone().to_string(),
3998 ));
3999
4000 let alias_expr = match kind {
4007 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
4008 HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name))
4009 }
4010 JoinKind::RightOuter => HirScalarExpr::named_column(rhs, Arc::clone(&rhs_name)),
4011 JoinKind::FullOuter => {
4012 HirScalarExpr::call_variadic(Coalesce, vec![expr1.clone(), expr2.clone()])
4013 }
4014 };
4015 map_exprs.push(alias_expr);
4016 }
4017
4018 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4019 }
4020 both_scope.items.extend(new_items);
4021
4022 for c in hidden_cols {
4026 both_scope.items[c].allow_unqualified_references = false;
4027 }
4028
4029 let project_key = join_cols
4031 .into_iter()
4032 .chain(0..both_scope.items.len())
4033 .unique()
4034 .collect::<Vec<_>>();
4035
4036 both_scope = both_scope.project(&project_key);
4037
4038 let on = HirScalarExpr::variadic_and(join_exprs);
4039
4040 let both = left
4041 .join(right, on, kind)
4042 .map(map_exprs)
4043 .project(project_key);
4044 Ok((both, both_scope))
4045}
4046
4047pub fn plan_expr<'a>(
4048 ecx: &'a ExprContext,
4049 e: &Expr<Aug>,
4050) -> Result<CoercibleScalarExpr, PlanError> {
4051 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4052}
4053
4054fn plan_expr_inner<'a>(
4055 ecx: &'a ExprContext,
4056 e: &Expr<Aug>,
4057) -> Result<CoercibleScalarExpr, PlanError> {
4058 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4059 return Ok(HirScalarExpr::named_column(
4061 i,
4062 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4063 )
4064 .into());
4065 }
4066
4067 match e {
4068 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4070 Ok(plan_identifier(ecx, names)?.into())
4071 }
4072
4073 Expr::Value(val) => plan_literal(val),
4075 Expr::Parameter(n) => plan_parameter(ecx, *n),
4076 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4077 Expr::List(exprs) => plan_list(ecx, exprs, None),
4078 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4079 Expr::Row { exprs } => plan_row(ecx, exprs),
4080
4081 Expr::Op { op, expr1, expr2 } => {
4083 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4084 }
4085 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4086 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4087
4088 Expr::Not { expr } => plan_not(ecx, expr),
4090 Expr::And { left, right } => plan_and(ecx, left, right),
4091 Expr::Or { left, right } => plan_or(ecx, left, right),
4092 Expr::IsExpr {
4093 expr,
4094 construct,
4095 negated,
4096 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4097 Expr::Case {
4098 operand,
4099 conditions,
4100 results,
4101 else_result,
4102 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4103 Expr::HomogenizingFunction { function, exprs } => {
4104 plan_homogenizing_function(ecx, function, exprs)
4105 }
4106 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4107 ecx,
4108 &None,
4109 &[l_expr.clone().equals(*r_expr.clone())],
4110 &[Expr::null()],
4111 &Some(Box::new(*l_expr.clone())),
4112 )?
4113 .into()),
4114 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4115 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4116 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4117 Expr::Like {
4118 expr,
4119 pattern,
4120 escape,
4121 case_insensitive,
4122 negated,
4123 } => Ok(plan_like(
4124 ecx,
4125 expr,
4126 pattern,
4127 escape.as_deref(),
4128 *case_insensitive,
4129 *negated,
4130 )?
4131 .into()),
4132
4133 Expr::InList {
4134 expr,
4135 list,
4136 negated,
4137 } => plan_in_list(ecx, expr, list, negated),
4138
4139 Expr::Exists(query) => plan_exists(ecx, query),
4141 Expr::Subquery(query) => plan_subquery(ecx, query),
4142 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4143 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4144 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4145 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4146 Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4147 Expr::InSubquery { .. } => {
4148 bail_internal!("Expr::InSubquery should have been desugared")
4149 }
4150 Expr::AnyExpr { .. } => {
4151 bail_internal!("Expr::AnyExpr should have been desugared")
4152 }
4153 Expr::AllExpr { .. } => {
4154 bail_internal!("Expr::AllExpr should have been desugared")
4155 }
4156 Expr::AnySubquery { .. } => {
4157 bail_internal!("Expr::AnySubquery should have been desugared")
4158 }
4159 Expr::AllSubquery { .. } => {
4160 bail_internal!("Expr::AllSubquery should have been desugared")
4161 }
4162 Expr::Between { .. } => {
4163 bail_internal!("Expr::Between should have been desugared")
4164 }
4165 }
4166}
4167
4168fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4169 if !ecx.allow_parameters {
4170 return Err(PlanError::UnknownParameter(n));
4174 }
4175 if n == 0 || n > 65536 {
4176 return Err(PlanError::UnknownParameter(n));
4177 }
4178 if ecx.param_types().borrow().contains_key(&n) {
4179 Ok(HirScalarExpr::parameter(n).into())
4180 } else {
4181 Ok(CoercibleScalarExpr::Parameter(n))
4182 }
4183}
4184
4185fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4186 let mut out = vec![];
4187 for e in exprs {
4188 out.push(plan_expr(ecx, e)?);
4189 }
4190 Ok(CoercibleScalarExpr::LiteralRecord(out))
4191}
4192
4193fn plan_cast(
4194 ecx: &ExprContext,
4195 expr: &Expr<Aug>,
4196 data_type: &ResolvedDataType,
4197) -> Result<CoercibleScalarExpr, PlanError> {
4198 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4199 let expr = match expr {
4200 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4209 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4210 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4211 _ => plan_expr(ecx, expr)?,
4212 };
4213 let ecx = &ecx.with_name("CAST");
4214 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4215 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4216 Ok(expr.into())
4217}
4218
4219fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4220 let ecx = ecx.with_name("NOT argument");
4221 Ok(plan_expr(&ecx, expr)?
4222 .type_as(&ecx, &SqlScalarType::Bool)?
4223 .call_unary(UnaryFunc::Not(expr_func::Not))
4224 .into())
4225}
4226
4227fn plan_and(
4228 ecx: &ExprContext,
4229 left: &Expr<Aug>,
4230 right: &Expr<Aug>,
4231) -> Result<CoercibleScalarExpr, PlanError> {
4232 let ecx = ecx.with_name("AND argument");
4233 Ok(HirScalarExpr::variadic_and(vec![
4234 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4235 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4236 ])
4237 .into())
4238}
4239
4240fn plan_or(
4241 ecx: &ExprContext,
4242 left: &Expr<Aug>,
4243 right: &Expr<Aug>,
4244) -> Result<CoercibleScalarExpr, PlanError> {
4245 let ecx = ecx.with_name("OR argument");
4246 Ok(HirScalarExpr::variadic_or(vec![
4247 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4248 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4249 ])
4250 .into())
4251}
4252
4253fn plan_in_list(
4254 ecx: &ExprContext,
4255 lhs: &Expr<Aug>,
4256 list: &Vec<Expr<Aug>>,
4257 negated: &bool,
4258) -> Result<CoercibleScalarExpr, PlanError> {
4259 let ecx = ecx.with_name("IN list");
4260 let or = HirScalarExpr::variadic_or(
4261 list.into_iter()
4262 .map(|e| {
4263 let eq = lhs.clone().equals(e.clone());
4264 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4265 })
4266 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4267 );
4268 Ok(if *negated {
4269 or.call_unary(UnaryFunc::Not(expr_func::Not))
4270 } else {
4271 or
4272 }
4273 .into())
4274}
4275
4276fn plan_homogenizing_function(
4277 ecx: &ExprContext,
4278 function: &HomogenizingFunction,
4279 exprs: &[Expr<Aug>],
4280) -> Result<CoercibleScalarExpr, PlanError> {
4281 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4283 match function {
4284 HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4285 HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4286 HomogenizingFunction::Least => VariadicFunc::from(Least),
4287 },
4288 coerce_homogeneous_exprs(
4289 &ecx.with_name(&function.to_string().to_lowercase()),
4290 plan_exprs(ecx, exprs)?,
4291 None,
4292 )?,
4293 );
4294 Ok(expr.into())
4295}
4296
4297fn plan_field_access(
4298 ecx: &ExprContext,
4299 expr: &Expr<Aug>,
4300 field: &Ident,
4301) -> Result<CoercibleScalarExpr, PlanError> {
4302 let field = normalize::column_name(field.clone());
4303 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4304 let ty = ecx.scalar_type(&expr);
4305 let i = match &ty {
4306 SqlScalarType::Record { fields, .. } => {
4307 fields.iter().position(|(name, _ty)| *name == field)
4308 }
4309 ty => sql_bail!(
4310 "column notation applied to type {}, which is not a composite type",
4311 ecx.humanize_sql_scalar_type(ty, false)
4312 ),
4313 };
4314 match i {
4315 None => sql_bail!(
4316 "field {} not found in data type {}",
4317 field,
4318 ecx.humanize_sql_scalar_type(&ty, false)
4319 ),
4320 Some(i) => Ok(expr
4321 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4322 .into()),
4323 }
4324}
4325
4326fn plan_subscript(
4327 ecx: &ExprContext,
4328 expr: &Expr<Aug>,
4329 positions: &[SubscriptPosition<Aug>],
4330) -> Result<CoercibleScalarExpr, PlanError> {
4331 assert!(
4332 !positions.is_empty(),
4333 "subscript expression must contain at least one position"
4334 );
4335
4336 let ecx = &ecx.with_name("subscripting");
4337 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4338 let ty = ecx.scalar_type(&expr);
4339 match &ty {
4340 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4341 ecx,
4342 expr,
4343 positions,
4344 if ty == SqlScalarType::Int2Vector {
4348 1
4349 } else {
4350 0
4351 },
4352 ),
4353 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4354 SqlScalarType::List { element_type, .. } => {
4355 let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4357 let n_layers = ty.unwrap_list_n_layers();
4358 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4359 }
4360 ty => sql_bail!(
4361 "cannot subscript type {}",
4362 ecx.humanize_sql_scalar_type(ty, false)
4363 ),
4364 }
4365}
4366
4367fn extract_scalar_subscript_from_positions<'a>(
4371 positions: &'a [SubscriptPosition<Aug>],
4372 expr_type_name: &str,
4373) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4374 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4375 for p in positions {
4376 if p.explicit_slice {
4377 sql_bail!("{} subscript does not support slices", expr_type_name);
4378 }
4379 assert!(
4380 p.end.is_none(),
4381 "index-appearing subscripts cannot have end value"
4382 );
4383 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4384 }
4385 Ok(scalar_subscripts)
4386}
4387
4388fn plan_subscript_array(
4389 ecx: &ExprContext,
4390 expr: HirScalarExpr,
4391 positions: &[SubscriptPosition<Aug>],
4392 offset: i64,
4393) -> Result<CoercibleScalarExpr, PlanError> {
4394 let mut exprs = Vec::with_capacity(positions.len() + 1);
4395 exprs.push(expr);
4396
4397 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4400
4401 for i in indexes {
4402 exprs.push(plan_expr(ecx, i)?.cast_to(
4403 ecx,
4404 CastContext::Explicit,
4405 &SqlScalarType::Int64,
4406 )?);
4407 }
4408
4409 Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4410}
4411
4412fn plan_subscript_list(
4413 ecx: &ExprContext,
4414 mut expr: HirScalarExpr,
4415 positions: &[SubscriptPosition<Aug>],
4416 mut remaining_layers: usize,
4417 elem_type_name: &str,
4418) -> Result<CoercibleScalarExpr, PlanError> {
4419 let mut i = 0;
4420
4421 while i < positions.len() {
4422 let j = positions[i..]
4424 .iter()
4425 .position(|p| p.explicit_slice)
4426 .unwrap_or(positions.len() - i);
4427 if j != 0 {
4428 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4429 let (n, e) = plan_index_list(
4430 ecx,
4431 expr,
4432 indexes.as_slice(),
4433 remaining_layers,
4434 elem_type_name,
4435 )?;
4436 remaining_layers = n;
4437 expr = e;
4438 i += j;
4439 }
4440
4441 let j = positions[i..]
4443 .iter()
4444 .position(|p| !p.explicit_slice)
4445 .unwrap_or(positions.len() - i);
4446 if j != 0 {
4447 expr = plan_slice_list(
4448 ecx,
4449 expr,
4450 &positions[i..i + j],
4451 remaining_layers,
4452 elem_type_name,
4453 )?;
4454 i += j;
4455 }
4456 }
4457
4458 Ok(expr.into())
4459}
4460
4461fn plan_index_list(
4462 ecx: &ExprContext,
4463 expr: HirScalarExpr,
4464 indexes: &[&Expr<Aug>],
4465 n_layers: usize,
4466 elem_type_name: &str,
4467) -> Result<(usize, HirScalarExpr), PlanError> {
4468 let depth = indexes.len();
4469
4470 if depth > n_layers {
4471 if n_layers == 0 {
4472 sql_bail!("cannot subscript type {}", elem_type_name)
4473 } else {
4474 sql_bail!(
4475 "cannot index into {} layers; list only has {} layer{}",
4476 depth,
4477 n_layers,
4478 if n_layers == 1 { "" } else { "s" }
4479 )
4480 }
4481 }
4482
4483 let mut exprs = Vec::with_capacity(depth + 1);
4484 exprs.push(expr);
4485
4486 for i in indexes {
4487 exprs.push(plan_expr(ecx, i)?.cast_to(
4488 ecx,
4489 CastContext::Explicit,
4490 &SqlScalarType::Int64,
4491 )?);
4492 }
4493
4494 Ok((
4495 n_layers - depth,
4496 HirScalarExpr::call_variadic(ListIndex, exprs),
4497 ))
4498}
4499
4500fn plan_slice_list(
4501 ecx: &ExprContext,
4502 expr: HirScalarExpr,
4503 slices: &[SubscriptPosition<Aug>],
4504 n_layers: usize,
4505 elem_type_name: &str,
4506) -> Result<HirScalarExpr, PlanError> {
4507 if n_layers == 0 {
4508 sql_bail!("cannot subscript type {}", elem_type_name)
4509 }
4510
4511 let mut exprs = Vec::with_capacity(slices.len() + 1);
4513 exprs.push(expr);
4514 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4516 Ok(match position {
4517 Some(p) => {
4518 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4519 }
4520 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4521 })
4522 };
4523 for p in slices {
4524 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4525 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4526 exprs.push(start);
4527 exprs.push(end);
4528 }
4529
4530 Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4531}
4532
4533fn plan_like(
4534 ecx: &ExprContext,
4535 expr: &Expr<Aug>,
4536 pattern: &Expr<Aug>,
4537 escape: Option<&Expr<Aug>>,
4538 case_insensitive: bool,
4539 not: bool,
4540) -> Result<HirScalarExpr, PlanError> {
4541 use CastContext::Implicit;
4542 let ecx = ecx.with_name("LIKE argument");
4543 let expr = plan_expr(&ecx, expr)?;
4544 let haystack = match ecx.scalar_type(&expr) {
4545 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4546 .type_as(&ecx, ty)?
4547 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4548 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4549 };
4550 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4551 if let Some(escape) = escape {
4552 pattern = pattern.call_binary(
4553 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4554 expr_func::LikeEscape,
4555 );
4556 }
4557 let func: BinaryFunc = if case_insensitive {
4558 expr_func::IsLikeMatchCaseInsensitive.into()
4559 } else {
4560 expr_func::IsLikeMatchCaseSensitive.into()
4561 };
4562 let like = haystack.call_binary(pattern, func);
4563 if not {
4564 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4565 } else {
4566 Ok(like)
4567 }
4568}
4569
4570fn plan_subscript_jsonb(
4571 ecx: &ExprContext,
4572 expr: HirScalarExpr,
4573 positions: &[SubscriptPosition<Aug>],
4574) -> Result<CoercibleScalarExpr, PlanError> {
4575 use CastContext::Implicit;
4576 use SqlScalarType::{Int64, String};
4577
4578 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4581
4582 let mut exprs = Vec::with_capacity(subscripts.len());
4583 for s in subscripts {
4584 let subscript = plan_expr(ecx, s)?;
4585 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4586 subscript
4587 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4588 typeconv::to_string(ecx, subscript)?
4592 } else {
4593 sql_bail!("jsonb subscript type must be coercible to integer or text");
4594 };
4595 exprs.push(subscript);
4596 }
4597
4598 let expr = expr.call_binary(
4601 HirScalarExpr::call_variadic(
4602 ArrayCreate {
4603 elem_type: SqlScalarType::String,
4604 },
4605 exprs,
4606 ),
4607 expr_func::JsonbGetPath,
4608 );
4609 Ok(expr.into())
4610}
4611
4612fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4613 if !ecx.allow_subqueries {
4614 sql_bail!("{} does not allow subqueries", ecx.name)
4615 }
4616 let mut qcx = ecx.derived_query_context();
4617 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4618 Ok(expr.exists().into())
4619}
4620
4621fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4622 if !ecx.allow_subqueries {
4623 sql_bail!("{} does not allow subqueries", ecx.name)
4624 }
4625 let mut qcx = ecx.derived_query_context();
4626 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4627 let column_types = qcx.relation_type(&expr).column_types;
4628 if column_types.len() != 1 {
4629 sql_bail!(
4630 "Expected subselect to return 1 column, got {} columns",
4631 column_types.len()
4632 );
4633 }
4634 Ok(expr.select().into())
4635}
4636
4637fn plan_list_subquery(
4638 ecx: &ExprContext,
4639 query: &Query<Aug>,
4640) -> Result<CoercibleScalarExpr, PlanError> {
4641 plan_vector_like_subquery(
4642 ecx,
4643 query,
4644 |_| false,
4645 |elem_type| ListCreate { elem_type }.into(),
4646 |order_by| AggregateFunc::ListConcat { order_by },
4647 expr_func::ListListConcat.into(),
4648 |elem_type| {
4649 HirScalarExpr::literal(
4650 Datum::empty_list(),
4651 SqlScalarType::List {
4652 element_type: Box::new(elem_type),
4653 custom_id: None,
4654 },
4655 )
4656 },
4657 "list",
4658 )
4659}
4660
4661fn plan_array_subquery(
4662 ecx: &ExprContext,
4663 query: &Query<Aug>,
4664) -> Result<CoercibleScalarExpr, PlanError> {
4665 plan_vector_like_subquery(
4666 ecx,
4667 query,
4668 |elem_type| {
4669 matches!(
4670 elem_type,
4671 SqlScalarType::Char { .. }
4672 | SqlScalarType::Array { .. }
4673 | SqlScalarType::List { .. }
4674 | SqlScalarType::Map { .. }
4675 )
4676 },
4677 |elem_type| ArrayCreate { elem_type }.into(),
4678 |order_by| AggregateFunc::ArrayConcat { order_by },
4679 expr_func::ArrayArrayConcat.into(),
4680 |elem_type| {
4681 HirScalarExpr::literal(
4682 Datum::empty_array(),
4683 SqlScalarType::Array(Box::new(elem_type)),
4684 )
4685 },
4686 "[]",
4687 )
4688}
4689
4690fn plan_vector_like_subquery<F1, F2, F3, F4>(
4692 ecx: &ExprContext,
4693 query: &Query<Aug>,
4694 is_unsupported_type: F1,
4695 vector_create: F2,
4696 aggregate_concat: F3,
4697 binary_concat: BinaryFunc,
4698 empty_literal: F4,
4699 vector_type_string: &str,
4700) -> Result<CoercibleScalarExpr, PlanError>
4701where
4702 F1: Fn(&SqlScalarType) -> bool,
4703 F2: Fn(SqlScalarType) -> VariadicFunc,
4704 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4705 F4: Fn(SqlScalarType) -> HirScalarExpr,
4706{
4707 if !ecx.allow_subqueries {
4708 sql_bail!("{} does not allow subqueries", ecx.name)
4709 }
4710
4711 let mut qcx = ecx.derived_query_context();
4712 let mut planned_query = plan_query(&mut qcx, query)?;
4713 if planned_query.limit.is_some()
4714 || !planned_query
4715 .offset
4716 .clone()
4717 .try_into_literal_int64()
4718 .is_ok_and(|offset| offset == 0)
4719 {
4720 planned_query.expr = HirRelationExpr::top_k(
4721 planned_query.expr,
4722 vec![],
4723 planned_query.order_by.clone(),
4724 planned_query.limit,
4725 planned_query.offset,
4726 planned_query.group_size_hints.limit_input_group_size,
4727 );
4728 }
4729
4730 if planned_query.project.len() != 1 {
4731 sql_bail!(
4732 "Expected subselect to return 1 column, got {} columns",
4733 planned_query.project.len()
4734 );
4735 }
4736
4737 let project_column = *planned_query.project.get(0).unwrap();
4738 let elem_type = qcx
4739 .relation_type(&planned_query.expr)
4740 .column_types
4741 .get(project_column)
4742 .cloned()
4743 .unwrap()
4744 .scalar_type();
4745
4746 if is_unsupported_type(&elem_type) {
4747 bail_unsupported!(format!(
4748 "cannot build array from subquery because return type {}{}",
4749 ecx.humanize_sql_scalar_type(&elem_type, false),
4750 vector_type_string
4751 ));
4752 }
4753
4754 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4757 vector_create(elem_type.clone()),
4758 vec![HirScalarExpr::column(project_column)],
4759 ))
4760 .chain(
4761 planned_query
4762 .order_by
4763 .iter()
4764 .map(|co| HirScalarExpr::column(co.column)),
4765 )
4766 .collect();
4767
4768 let aggregation_projection = vec![0];
4772 let aggregation_order_by = planned_query
4773 .order_by
4774 .into_iter()
4775 .enumerate()
4776 .map(|(i, order)| ColumnOrder { column: i, ..order })
4777 .collect();
4778
4779 let reduced_expr = planned_query
4780 .expr
4781 .reduce(
4782 vec![],
4783 vec![AggregateExpr {
4784 func: aggregate_concat(aggregation_order_by),
4785 expr: Box::new(HirScalarExpr::call_variadic(
4786 RecordCreate {
4787 field_names: iter::repeat(ColumnName::from(""))
4788 .take(aggregation_exprs.len())
4789 .collect(),
4790 },
4791 aggregation_exprs,
4792 )),
4793 distinct: false,
4794 }],
4795 None,
4796 )
4797 .project(aggregation_projection);
4798
4799 Ok(reduced_expr
4801 .select()
4802 .call_binary(empty_literal(elem_type), binary_concat)
4803 .into())
4804}
4805
4806fn plan_map_subquery(
4807 ecx: &ExprContext,
4808 query: &Query<Aug>,
4809) -> Result<CoercibleScalarExpr, PlanError> {
4810 if !ecx.allow_subqueries {
4811 sql_bail!("{} does not allow subqueries", ecx.name)
4812 }
4813
4814 let mut qcx = ecx.derived_query_context();
4815 let mut query = plan_query(&mut qcx, query)?;
4816 if query.limit.is_some()
4817 || !query
4818 .offset
4819 .clone()
4820 .try_into_literal_int64()
4821 .is_ok_and(|offset| offset == 0)
4822 {
4823 query.expr = HirRelationExpr::top_k(
4824 query.expr,
4825 vec![],
4826 query.order_by.clone(),
4827 query.limit,
4828 query.offset,
4829 query.group_size_hints.limit_input_group_size,
4830 );
4831 }
4832 if query.project.len() != 2 {
4833 sql_bail!(
4834 "expected map subquery to return 2 columns, got {} columns",
4835 query.project.len()
4836 );
4837 }
4838
4839 let query_types = qcx.relation_type(&query.expr).column_types;
4840 let key_column = query.project[0];
4841 let key_type = query_types[key_column].clone().scalar_type();
4842 let value_column = query.project[1];
4843 let value_type = query_types[value_column].clone().scalar_type();
4844
4845 if key_type != SqlScalarType::String {
4846 sql_bail!("cannot build map from subquery because first column is not of type text");
4847 }
4848
4849 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4850 RecordCreate {
4851 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4852 },
4853 vec![
4854 HirScalarExpr::column(key_column),
4855 HirScalarExpr::column(value_column),
4856 ],
4857 ))
4858 .chain(
4859 query
4860 .order_by
4861 .iter()
4862 .map(|co| HirScalarExpr::column(co.column)),
4863 )
4864 .collect();
4865
4866 let expr = query
4867 .expr
4868 .reduce(
4869 vec![],
4870 vec![AggregateExpr {
4871 func: AggregateFunc::MapAgg {
4872 order_by: query
4873 .order_by
4874 .into_iter()
4875 .enumerate()
4876 .map(|(i, order)| ColumnOrder { column: i, ..order })
4877 .collect(),
4878 value_type: value_type.clone(),
4879 },
4880 expr: Box::new(HirScalarExpr::call_variadic(
4881 RecordCreate {
4882 field_names: iter::repeat(ColumnName::from(""))
4883 .take(aggregation_exprs.len())
4884 .collect(),
4885 },
4886 aggregation_exprs,
4887 )),
4888 distinct: false,
4889 }],
4890 None,
4891 )
4892 .project(vec![0]);
4893
4894 let expr = HirScalarExpr::call_variadic(
4896 Coalesce,
4897 vec![
4898 expr.select(),
4899 HirScalarExpr::literal(
4900 Datum::empty_map(),
4901 SqlScalarType::Map {
4902 value_type: Box::new(value_type),
4903 custom_id: None,
4904 },
4905 ),
4906 ],
4907 );
4908
4909 Ok(expr.into())
4910}
4911
4912fn plan_collate(
4913 ecx: &ExprContext,
4914 expr: &Expr<Aug>,
4915 collation: &UnresolvedItemName,
4916) -> Result<CoercibleScalarExpr, PlanError> {
4917 if collation.0.len() == 2
4918 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4919 && collation.0[1] == ident!("default")
4920 {
4921 plan_expr(ecx, expr)
4922 } else {
4923 bail_unsupported!("COLLATE");
4924 }
4925}
4926
4927fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4934where
4935 E: std::borrow::Borrow<Expr<Aug>>,
4936{
4937 let mut out = vec![];
4938 for expr in exprs {
4939 out.push(plan_expr(ecx, expr.borrow())?);
4940 }
4941 Ok(out)
4942}
4943
4944fn plan_array(
4946 ecx: &ExprContext,
4947 exprs: &[Expr<Aug>],
4948 type_hint: Option<&SqlScalarType>,
4949) -> Result<CoercibleScalarExpr, PlanError> {
4950 let mut out = vec![];
4952 for expr in exprs {
4953 out.push(match expr {
4954 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4957 _ => plan_expr(ecx, expr)?,
4958 });
4959 }
4960
4961 let type_hint = match type_hint {
4963 Some(SqlScalarType::Array(elem_type)) => {
4968 let multidimensional = out.iter().any(|e| {
4969 matches!(
4970 ecx.scalar_type(e),
4971 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4972 )
4973 });
4974 if multidimensional {
4975 type_hint
4976 } else {
4977 Some(&**elem_type)
4978 }
4979 }
4980 Some(_) => None,
4984 None => None,
4986 };
4987
4988 let (elem_type, exprs) = if exprs.is_empty() {
4990 if let Some(elem_type) = type_hint {
4991 (elem_type.clone(), vec![])
4992 } else {
4993 sql_bail!("cannot determine type of empty array");
4994 }
4995 } else {
4996 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4997 (ecx.scalar_type(&out[0]), out)
4998 };
4999
5000 if matches!(
5006 elem_type,
5007 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
5008 ) {
5009 bail_unsupported!(format!(
5010 "{}[]",
5011 ecx.humanize_sql_scalar_type(&elem_type, false)
5012 ));
5013 }
5014
5015 Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
5016}
5017
5018fn plan_list(
5019 ecx: &ExprContext,
5020 exprs: &[Expr<Aug>],
5021 type_hint: Option<&SqlScalarType>,
5022) -> Result<CoercibleScalarExpr, PlanError> {
5023 let (elem_type, exprs) = if exprs.is_empty() {
5024 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5025 (element_type.without_modifiers(), vec![])
5026 } else {
5027 sql_bail!("cannot determine type of empty list");
5028 }
5029 } else {
5030 let type_hint = match type_hint {
5031 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5032 _ => None,
5033 };
5034
5035 let mut out = vec![];
5036 for expr in exprs {
5037 out.push(match expr {
5038 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5041 _ => plan_expr(ecx, expr)?,
5042 });
5043 }
5044 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5045 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5046 };
5047
5048 if matches!(elem_type, SqlScalarType::Char { .. }) {
5049 bail_unsupported!("char list");
5050 }
5051
5052 Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5053}
5054
5055fn plan_map(
5056 ecx: &ExprContext,
5057 entries: &[MapEntry<Aug>],
5058 type_hint: Option<&SqlScalarType>,
5059) -> Result<CoercibleScalarExpr, PlanError> {
5060 let (value_type, exprs) = if entries.is_empty() {
5061 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5062 (value_type.without_modifiers(), vec![])
5063 } else {
5064 sql_bail!("cannot determine type of empty map");
5065 }
5066 } else {
5067 let type_hint = match type_hint {
5068 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5069 _ => None,
5070 };
5071
5072 let mut keys = vec![];
5073 let mut values = vec![];
5074 for MapEntry { key, value } in entries {
5075 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5076 let value = match value {
5077 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5080 _ => plan_expr(ecx, value)?,
5081 };
5082 keys.push(key);
5083 values.push(value);
5084 }
5085 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5086 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5087 let out = itertools::interleave(keys, values).collect();
5088 (value_type, out)
5089 };
5090
5091 if matches!(value_type, SqlScalarType::Char { .. }) {
5092 bail_unsupported!("char map");
5093 }
5094
5095 let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5096 Ok(expr.into())
5097}
5098
5099pub fn coerce_homogeneous_exprs(
5116 ecx: &ExprContext,
5117 exprs: Vec<CoercibleScalarExpr>,
5118 force_type: Option<&SqlScalarType>,
5119) -> Result<Vec<HirScalarExpr>, PlanError> {
5120 assert!(!exprs.is_empty());
5121
5122 let target_holder;
5123 let target = match force_type {
5124 Some(t) => t,
5125 None => {
5126 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5127 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5128 &target_holder
5129 }
5130 };
5131
5132 let mut out = Vec::new();
5134 for expr in exprs {
5135 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5136 let ccx = match force_type {
5137 None => CastContext::Implicit,
5138 Some(_) => CastContext::Explicit,
5139 };
5140 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5141 Ok(expr) => out.push(expr),
5142 Err(_) => sql_bail!(
5143 "{} could not convert type {} to {}",
5144 ecx.name,
5145 ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5146 ecx.humanize_sql_scalar_type(target, false),
5147 ),
5148 }
5149 }
5150 Ok(out)
5151}
5152
5153pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5156 obe: &OrderByExpr<T>,
5157 column: usize,
5158) -> ColumnOrder {
5159 let desc = !obe.asc.unwrap_or(true);
5160 ColumnOrder {
5161 column,
5162 desc,
5163 nulls_last: obe.nulls_last.unwrap_or(!desc),
5166 }
5167}
5168
5169fn plan_function_order_by(
5177 ecx: &ExprContext,
5178 order_by: &[OrderByExpr<Aug>],
5179) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5180 let mut order_by_exprs = vec![];
5181 let mut col_orders = vec![];
5182 {
5183 for (i, obe) in order_by.iter().enumerate() {
5184 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5188 order_by_exprs.push(expr);
5189 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5190 }
5191 }
5192 Ok((order_by_exprs, col_orders))
5193}
5194
5195fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5200 scx.humanize_resolved_name(name)
5201 .map(|n| n.to_string())
5202 .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5203}
5204
5205fn plan_aggregate_common(
5207 ecx: &ExprContext,
5208 Function::<Aug> {
5209 name,
5210 args,
5211 filter,
5212 over: _,
5213 distinct,
5214 }: &Function<Aug>,
5215) -> Result<AggregateExpr, PlanError> {
5216 let impls = match resolve_func(ecx, name, args)? {
5231 Func::Aggregate(impls) => impls,
5232 _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5233 };
5234
5235 let (args, order_by) = match &args {
5244 FunctionArgs::Star => (vec![], vec![]),
5245 FunctionArgs::Args { args, order_by } => {
5246 if args.is_empty() {
5247 sql_bail!(
5248 "{}(*) must be used to call a parameterless aggregate function",
5249 humanize_or_debug(ecx.qcx.scx, name)
5250 );
5251 }
5252 let args = plan_exprs(ecx, args)?;
5253 (args, order_by.clone())
5254 }
5255 };
5256
5257 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5258
5259 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5260 if let Some(filter) = &filter {
5261 let cond =
5271 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5272 let expr_typ = ecx.scalar_type(&expr);
5273 expr = HirScalarExpr::if_then_else(
5274 cond,
5275 expr,
5276 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5277 );
5278 }
5279
5280 let mut seen_outer = false;
5281 let mut seen_inner = false;
5282 #[allow(deprecated)]
5283 expr.visit_columns(0, &mut |depth, col| {
5284 if depth == 0 && col.level == 0 {
5285 seen_inner = true;
5286 } else if col.level > depth {
5287 seen_outer = true;
5288 }
5289 });
5290 if seen_outer && !seen_inner {
5291 bail_unsupported!(
5292 3720,
5293 "aggregate functions that refer exclusively to outer columns"
5294 );
5295 }
5296
5297 if func.is_order_sensitive() {
5300 let field_names = iter::repeat(ColumnName::from(""))
5301 .take(1 + order_by_exprs.len())
5302 .collect();
5303 let mut exprs = vec![expr];
5304 exprs.extend(order_by_exprs);
5305 expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5306 }
5307
5308 Ok(AggregateExpr {
5309 func,
5310 expr: Box::new(expr),
5311 distinct: *distinct,
5312 })
5313}
5314
5315fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5316 let mut names = names.to_vec();
5317 let Some(last) = names.pop() else {
5320 bail_internal!("empty identifier");
5321 };
5322 let col_name = normalize::column_name(last);
5323
5324 if !names.is_empty() {
5326 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5327 let (i, i_name) = ecx.scope.resolve_table_column(
5328 &ecx.qcx.outer_scopes,
5329 &table_name,
5330 &col_name,
5331 &mut ecx.qcx.name_manager.borrow_mut(),
5332 )?;
5333 return Ok(HirScalarExpr::named_column(i, i_name));
5334 }
5335
5336 let similar_names = match ecx.scope.resolve_column(
5339 &ecx.qcx.outer_scopes,
5340 &col_name,
5341 &mut ecx.qcx.name_manager.borrow_mut(),
5342 ) {
5343 Ok((i, i_name)) => {
5344 return Ok(HirScalarExpr::named_column(i, i_name));
5345 }
5346 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5347 Err(e) => return Err(e),
5348 };
5349
5350 let items = ecx.scope.items_from_table(
5353 &ecx.qcx.outer_scopes,
5354 &PartialItemName {
5355 database: None,
5356 schema: None,
5357 item: col_name.as_str().to_owned(),
5358 },
5359 )?;
5360 match items.as_slice() {
5361 [] => Err(PlanError::UnknownColumn {
5363 table: None,
5364 column: col_name,
5365 similar: similar_names,
5366 }),
5367 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5372 *column,
5373 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5374 )),
5375 _ => {
5378 let mut has_exists_column = None;
5379 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5380 .into_iter()
5381 .filter_map(|(column, item)| {
5382 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5383 has_exists_column = Some(column);
5384 None
5385 } else {
5386 let expr = HirScalarExpr::named_column(
5387 column,
5388 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5389 );
5390 let name = item.column_name.clone();
5391 Some((expr, name))
5392 }
5393 })
5394 .unzip();
5395 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5397 exprs.into_element()
5398 } else {
5399 HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5400 };
5401 if let Some(has_exists_column) = has_exists_column {
5402 Ok(HirScalarExpr::if_then_else(
5403 HirScalarExpr::unnamed_column(has_exists_column)
5404 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5405 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5406 expr,
5407 ))
5408 } else {
5409 Ok(expr)
5410 }
5411 }
5412 }
5413}
5414
5415fn plan_op(
5416 ecx: &ExprContext,
5417 op: &str,
5418 expr1: &Expr<Aug>,
5419 expr2: Option<&Expr<Aug>>,
5420) -> Result<HirScalarExpr, PlanError> {
5421 let impls = func::resolve_op(op)?;
5422 let args = match expr2 {
5423 None => plan_exprs(ecx, &[expr1])?,
5424 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5425 };
5426 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5427}
5428
5429fn plan_function<'a>(
5430 ecx: &ExprContext,
5431 f @ Function {
5432 name,
5433 args,
5434 filter,
5435 over,
5436 distinct,
5437 }: &'a Function<Aug>,
5438) -> Result<HirScalarExpr, PlanError> {
5439 let impls = match resolve_func(ecx, name, args)? {
5440 Func::Table(_) => {
5441 sql_bail!(
5442 "table functions are not allowed in {} (function {})",
5443 ecx.name,
5444 name
5445 );
5446 }
5447 Func::Scalar(impls) => {
5448 if over.is_some() {
5449 sql_bail!(
5450 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5451 );
5452 }
5453 impls
5454 }
5455 Func::ScalarWindow(impls) => {
5456 let (
5457 ignore_nulls,
5458 order_by_exprs,
5459 col_orders,
5460 _window_frame,
5461 partition_by,
5462 scalar_args,
5463 ) = plan_window_function_non_aggr(ecx, f)?;
5464
5465 if !scalar_args.is_empty() {
5469 if let ResolvedItemName::Item {
5470 full_name: FullItemName { item, .. },
5471 ..
5472 } = name
5473 {
5474 sql_bail!(
5475 "function {} has 0 parameters, but was called with {}",
5476 item,
5477 scalar_args.len()
5478 );
5479 }
5480 }
5481
5482 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5487
5488 if ignore_nulls {
5489 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5492 }
5493
5494 return Ok(HirScalarExpr::windowing(WindowExpr {
5495 func: WindowExprType::Scalar(ScalarWindowExpr {
5496 func,
5497 order_by: col_orders,
5498 }),
5499 partition_by,
5500 order_by: order_by_exprs,
5501 }));
5502 }
5503 Func::ValueWindow(impls) => {
5504 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5505 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5506 window_plan;
5507
5508 let (args_encoded, func) =
5509 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5510
5511 if ignore_nulls {
5512 match func {
5513 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5514 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5515 }
5516 }
5517
5518 return Ok(HirScalarExpr::windowing(WindowExpr {
5519 func: WindowExprType::Value(ValueWindowExpr {
5520 func,
5521 args: Box::new(args_encoded),
5522 order_by: col_orders,
5523 window_frame,
5524 ignore_nulls, }),
5526 partition_by,
5527 order_by: order_by_exprs,
5528 }));
5529 }
5530 Func::Aggregate(_) => {
5531 if f.over.is_none() {
5532 if ecx.allow_aggregates {
5534 sql_bail!(
5537 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5538 name,
5539 );
5540 } else {
5541 sql_bail!(
5544 "aggregate functions are not allowed in {} (function {})",
5545 ecx.name,
5546 name
5547 );
5548 }
5549 } else {
5550 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5551 plan_window_function_common(ecx, &f.name, &f.over)?;
5552
5553 match (&window_frame.start_bound, &window_frame.end_bound) {
5555 (
5556 mz_expr::WindowFrameBound::UnboundedPreceding,
5557 mz_expr::WindowFrameBound::OffsetPreceding(..),
5558 )
5559 | (
5560 mz_expr::WindowFrameBound::UnboundedPreceding,
5561 mz_expr::WindowFrameBound::OffsetFollowing(..),
5562 )
5563 | (
5564 mz_expr::WindowFrameBound::OffsetPreceding(..),
5565 mz_expr::WindowFrameBound::UnboundedFollowing,
5566 )
5567 | (
5568 mz_expr::WindowFrameBound::OffsetFollowing(..),
5569 mz_expr::WindowFrameBound::UnboundedFollowing,
5570 ) => bail_unsupported!("mixed unbounded - offset frames"),
5571 (_, _) => {} }
5573
5574 if ignore_nulls {
5575 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5579 }
5580
5581 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5582
5583 if aggregate_expr.distinct {
5584 bail_unsupported!("DISTINCT in window aggregates");
5586 }
5587
5588 return Ok(HirScalarExpr::windowing(WindowExpr {
5589 func: WindowExprType::Aggregate(AggregateWindowExpr {
5590 aggregate_expr,
5591 order_by: col_orders,
5592 window_frame,
5593 }),
5594 partition_by,
5595 order_by: order_by_exprs,
5596 }));
5597 }
5598 }
5599 };
5600
5601 if over.is_some() {
5602 bail_internal!("OVER clause should have been handled by the window function path above");
5603 }
5604
5605 if *distinct {
5606 sql_bail!(
5607 "DISTINCT specified, but {} is not an aggregate function",
5608 humanize_or_debug(ecx.qcx.scx, name)
5609 );
5610 }
5611 if filter.is_some() {
5612 sql_bail!(
5613 "FILTER specified, but {} is not an aggregate function",
5614 humanize_or_debug(ecx.qcx.scx, name)
5615 );
5616 }
5617
5618 let scalar_args = match &args {
5619 FunctionArgs::Star => {
5620 sql_bail!(
5621 "* argument is invalid with non-aggregate function {}",
5622 humanize_or_debug(ecx.qcx.scx, name)
5623 )
5624 }
5625 FunctionArgs::Args { args, order_by } => {
5626 if !order_by.is_empty() {
5627 sql_bail!(
5628 "ORDER BY specified, but {} is not an aggregate function",
5629 humanize_or_debug(ecx.qcx.scx, name)
5630 );
5631 }
5632 plan_exprs(ecx, args)?
5633 }
5634 };
5635
5636 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5637}
5638
5639pub const IGNORE_NULLS_ERROR_MSG: &str =
5640 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5641
5642pub fn resolve_func(
5646 ecx: &ExprContext,
5647 name: &ResolvedItemName,
5648 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5649) -> Result<&'static Func, PlanError> {
5650 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5651 if let Ok(f) = i.func() {
5652 return Ok(f);
5653 }
5654 }
5655
5656 let cexprs = match args {
5659 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5660 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5661 if !order_by.is_empty() {
5662 sql_bail!(
5663 "ORDER BY specified, but {} is not an aggregate function",
5664 name
5665 );
5666 }
5667 plan_exprs(ecx, args)?
5668 }
5669 };
5670
5671 let arg_types: Vec<_> = cexprs
5672 .into_iter()
5673 .map(|ty| match ecx.scalar_type(&ty) {
5674 CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5675 CoercibleScalarType::Record(_) => "record".to_string(),
5676 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5677 })
5678 .collect();
5679
5680 Err(PlanError::UnknownFunction {
5681 name: name.to_string(),
5682 arg_types,
5683 })
5684}
5685
5686fn plan_is_expr<'a>(
5687 ecx: &ExprContext,
5688 expr: &'a Expr<Aug>,
5689 construct: &IsExprConstruct<Aug>,
5690 not: bool,
5691) -> Result<HirScalarExpr, PlanError> {
5692 let expr_hir = plan_expr(ecx, expr)?;
5693
5694 let mut result = match construct {
5695 IsExprConstruct::Null => {
5696 expr_hir.type_as_any(ecx)?.call_is_null()
5701 }
5702 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5703 IsExprConstruct::True => expr_hir
5704 .type_as(ecx, &SqlScalarType::Bool)?
5705 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5706 IsExprConstruct::False => expr_hir
5707 .type_as(ecx, &SqlScalarType::Bool)?
5708 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5709 IsExprConstruct::DistinctFrom(expr2) => {
5710 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5721 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5722
5723 let expr1_hir = expr_hir.type_as_any(ecx)?;
5724 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5725
5726 let term1 = HirScalarExpr::variadic_or(vec![
5727 ne_hir,
5728 expr1_hir.clone().call_is_null(),
5729 expr2_hir.clone().call_is_null(),
5730 ]);
5731 let term2 = HirScalarExpr::variadic_or(vec![
5732 expr1_hir.call_is_null().not(),
5733 expr2_hir.call_is_null().not(),
5734 ]);
5735 term1.and(term2)
5736 }
5737 };
5738 if not {
5739 result = result.not();
5740 }
5741 Ok(result)
5742}
5743
5744fn plan_case<'a>(
5745 ecx: &ExprContext,
5746 operand: &'a Option<Box<Expr<Aug>>>,
5747 conditions: &'a [Expr<Aug>],
5748 results: &'a [Expr<Aug>],
5749 else_result: &'a Option<Box<Expr<Aug>>>,
5750) -> Result<HirScalarExpr, PlanError> {
5751 let mut cond_exprs = Vec::new();
5752 let mut result_exprs = Vec::new();
5753 for (c, r) in conditions.iter().zip_eq(results) {
5754 let c = match operand {
5755 Some(operand) => operand.clone().equals(c.clone()),
5756 None => c.clone(),
5757 };
5758 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5759 cond_exprs.push(cexpr);
5760 result_exprs.push(r);
5761 }
5762 result_exprs.push(match else_result {
5763 Some(else_result) => else_result,
5764 None => &Expr::Value(Value::Null),
5765 });
5766 let mut result_exprs = coerce_homogeneous_exprs(
5767 &ecx.with_name("CASE"),
5768 plan_exprs(ecx, &result_exprs)?,
5769 None,
5770 )?;
5771 let mut expr = result_exprs.pop().unwrap();
5772 assert_eq!(cond_exprs.len(), result_exprs.len());
5773 for (cexpr, rexpr) in cond_exprs
5774 .into_iter()
5775 .rev()
5776 .zip_eq(result_exprs.into_iter().rev())
5777 {
5778 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5779 }
5780 Ok(expr)
5781}
5782
5783fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5784 let (datum, scalar_type) = match l {
5785 Value::Number(s) => {
5786 let d = strconv::parse_numeric(s.as_str())?;
5787 if !s.contains(&['E', '.'][..]) {
5788 if let Ok(n) = d.0.try_into() {
5790 (Datum::Int32(n), SqlScalarType::Int32)
5791 } else if let Ok(n) = d.0.try_into() {
5792 (Datum::Int64(n), SqlScalarType::Int64)
5793 } else {
5794 (
5795 Datum::Numeric(d),
5796 SqlScalarType::Numeric { max_scale: None },
5797 )
5798 }
5799 } else {
5800 (
5801 Datum::Numeric(d),
5802 SqlScalarType::Numeric { max_scale: None },
5803 )
5804 }
5805 }
5806 Value::HexString(_) => bail_unsupported!("hex string literals"),
5807 Value::Boolean(b) => match b {
5808 false => (Datum::False, SqlScalarType::Bool),
5809 true => (Datum::True, SqlScalarType::Bool),
5810 },
5811 Value::Interval(i) => {
5812 let i = literal::plan_interval(i)?;
5813 (Datum::Interval(i), SqlScalarType::Interval)
5814 }
5815 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5816 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5817 };
5818 let expr = HirScalarExpr::literal(datum, scalar_type);
5819 Ok(expr.into())
5820}
5821
5822fn plan_window_function_non_aggr<'a>(
5825 ecx: &ExprContext,
5826 Function {
5827 name,
5828 args,
5829 filter,
5830 over,
5831 distinct,
5832 }: &'a Function<Aug>,
5833) -> Result<
5834 (
5835 bool,
5836 Vec<HirScalarExpr>,
5837 Vec<ColumnOrder>,
5838 mz_expr::WindowFrame,
5839 Vec<HirScalarExpr>,
5840 Vec<CoercibleScalarExpr>,
5841 ),
5842 PlanError,
5843> {
5844 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5845 plan_window_function_common(ecx, name, over)?;
5846
5847 if *distinct {
5848 sql_bail!(
5849 "DISTINCT specified, but {} is not an aggregate function",
5850 name
5851 );
5852 }
5853
5854 if filter.is_some() {
5855 bail_unsupported!("FILTER in non-aggregate window functions");
5856 }
5857
5858 let scalar_args = match &args {
5859 FunctionArgs::Star => {
5860 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5861 }
5862 FunctionArgs::Args { args, order_by } => {
5863 if !order_by.is_empty() {
5864 sql_bail!(
5865 "ORDER BY specified, but {} is not an aggregate function",
5866 name
5867 );
5868 }
5869 plan_exprs(ecx, args)?
5870 }
5871 };
5872
5873 Ok((
5874 ignore_nulls,
5875 order_by_exprs,
5876 col_orders,
5877 window_frame,
5878 partition,
5879 scalar_args,
5880 ))
5881}
5882
5883fn plan_window_function_common(
5885 ecx: &ExprContext,
5886 name: &<Aug as AstInfo>::ItemName,
5887 over: &Option<WindowSpec<Aug>>,
5888) -> Result<
5889 (
5890 bool,
5891 Vec<HirScalarExpr>,
5892 Vec<ColumnOrder>,
5893 mz_expr::WindowFrame,
5894 Vec<HirScalarExpr>,
5895 ),
5896 PlanError,
5897> {
5898 if !ecx.allow_windows {
5899 sql_bail!(
5900 "window functions are not allowed in {} (function {})",
5901 ecx.name,
5902 name
5903 );
5904 }
5905
5906 let window_spec = match over.as_ref() {
5907 Some(over) => over,
5908 None => sql_bail!("window function {} requires an OVER clause", name),
5909 };
5910 if window_spec.ignore_nulls && window_spec.respect_nulls {
5911 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5912 }
5913 let window_frame = match window_spec.window_frame.as_ref() {
5914 Some(frame) => plan_window_frame(frame)?,
5915 None => mz_expr::WindowFrame::default(),
5916 };
5917 let mut partition = Vec::new();
5918 for expr in &window_spec.partition_by {
5919 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5920 }
5921
5922 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5923
5924 Ok((
5925 window_spec.ignore_nulls,
5926 order_by_exprs,
5927 col_orders,
5928 window_frame,
5929 partition,
5930 ))
5931}
5932
5933fn plan_window_frame(
5934 WindowFrame {
5935 units,
5936 start_bound,
5937 end_bound,
5938 }: &WindowFrame,
5939) -> Result<mz_expr::WindowFrame, PlanError> {
5940 use mz_expr::WindowFrameBound::*;
5941 let units = window_frame_unit_ast_to_expr(units)?;
5942 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5943 let end_bound = end_bound
5944 .as_ref()
5945 .map(window_frame_bound_ast_to_expr)
5946 .unwrap_or(CurrentRow);
5947
5948 match (&start_bound, &end_bound) {
5950 (UnboundedFollowing, _) => {
5952 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5953 }
5954 (_, UnboundedPreceding) => {
5956 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5957 }
5958 (CurrentRow, OffsetPreceding(_)) => {
5960 sql_bail!("frame starting from current row cannot have preceding rows")
5961 }
5962 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5963 sql_bail!("frame starting from following row cannot have preceding rows")
5964 }
5965 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5968 if *o1 > 1000000 || *o2 > 1000000 {
5972 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5973 }
5974 }
5975 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5976 if *o1 > 1000000 || *o2 > 1000000 {
5977 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5978 }
5979 }
5980 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5981 if *o1 > 1000000 || *o2 > 1000000 {
5982 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5983 }
5984 }
5985 (OffsetPreceding(o), CurrentRow) => {
5986 if *o > 1000000 {
5987 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5988 }
5989 }
5990 (CurrentRow, OffsetFollowing(o)) => {
5991 if *o > 1000000 {
5992 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5993 }
5994 }
5995 (_, _) => (),
5997 }
5998
5999 if units == mz_expr::WindowFrameUnits::Range
6002 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
6003 {
6004 bail_unsupported!("RANGE in non-default window frames")
6005 }
6006
6007 let frame = mz_expr::WindowFrame {
6008 units,
6009 start_bound,
6010 end_bound,
6011 };
6012 Ok(frame)
6013}
6014
6015fn window_frame_unit_ast_to_expr(
6016 unit: &WindowFrameUnits,
6017) -> Result<mz_expr::WindowFrameUnits, PlanError> {
6018 match unit {
6019 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6020 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6021 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6022 }
6023}
6024
6025fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6026 match bound {
6027 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6028 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6029 WindowFrameBound::Preceding(Some(offset)) => {
6030 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6031 }
6032 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6033 WindowFrameBound::Following(Some(offset)) => {
6034 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6035 }
6036 }
6037}
6038
6039pub fn scalar_type_from_sql(
6040 scx: &StatementContext,
6041 data_type: &ResolvedDataType,
6042) -> Result<SqlScalarType, PlanError> {
6043 match data_type {
6044 ResolvedDataType::AnonymousList(elem_type) => {
6045 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6046 if matches!(elem_type, SqlScalarType::Char { .. }) {
6047 bail_unsupported!("char list");
6048 }
6049 Ok(SqlScalarType::List {
6050 element_type: Box::new(elem_type),
6051 custom_id: None,
6052 })
6053 }
6054 ResolvedDataType::AnonymousMap {
6055 key_type,
6056 value_type,
6057 } => {
6058 match scalar_type_from_sql(scx, key_type)? {
6059 SqlScalarType::String => {}
6060 other => sql_bail!(
6061 "map key type must be {}, got {}",
6062 scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6063 scx.humanize_sql_scalar_type(&other, false)
6064 ),
6065 }
6066 Ok(SqlScalarType::Map {
6067 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6068 custom_id: None,
6069 })
6070 }
6071 ResolvedDataType::Named { id, modifiers, .. } => {
6072 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6073 }
6074 ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6075 }
6076}
6077
6078pub fn scalar_type_from_catalog(
6079 catalog: &dyn SessionCatalog,
6080 id: CatalogItemId,
6081 modifiers: &[i64],
6082) -> Result<SqlScalarType, PlanError> {
6083 let entry = catalog.get_item(&id);
6084 let type_details = match entry.type_details() {
6085 Some(type_details) => type_details,
6086 None => {
6087 sql_bail!(
6090 "internal error: {} does not refer to a type",
6091 catalog.resolve_full_name(entry.name()).to_string().quoted()
6092 );
6093 }
6094 };
6095 match &type_details.typ {
6096 CatalogType::Numeric => {
6097 let mut modifiers = modifiers.iter().fuse();
6098 let precision = match modifiers.next() {
6099 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6100 sql_bail!(
6101 "precision for type numeric must be between 1 and {}",
6102 NUMERIC_DATUM_MAX_PRECISION,
6103 );
6104 }
6105 Some(p) => Some(*p),
6106 None => None,
6107 };
6108 let scale = match modifiers.next() {
6109 Some(scale) => {
6110 if let Some(precision) = precision {
6111 if *scale > precision {
6112 sql_bail!(
6113 "scale for type numeric must be between 0 and precision {}",
6114 precision
6115 );
6116 }
6117 }
6118 Some(NumericMaxScale::try_from(*scale)?)
6119 }
6120 None => None,
6121 };
6122 if modifiers.next().is_some() {
6123 sql_bail!("type numeric supports at most two type modifiers");
6124 }
6125 Ok(SqlScalarType::Numeric { max_scale: scale })
6126 }
6127 CatalogType::Char => {
6128 let mut modifiers = modifiers.iter().fuse();
6129 let length = match modifiers.next() {
6130 Some(l) => Some(CharLength::try_from(*l)?),
6131 None => Some(CharLength::ONE),
6132 };
6133 if modifiers.next().is_some() {
6134 sql_bail!("type character supports at most one type modifier");
6135 }
6136 Ok(SqlScalarType::Char { length })
6137 }
6138 CatalogType::VarChar => {
6139 let mut modifiers = modifiers.iter().fuse();
6140 let length = match modifiers.next() {
6141 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6142 None => None,
6143 };
6144 if modifiers.next().is_some() {
6145 sql_bail!("type character varying supports at most one type modifier");
6146 }
6147 Ok(SqlScalarType::VarChar { max_length: length })
6148 }
6149 CatalogType::Timestamp => {
6150 let mut modifiers = modifiers.iter().fuse();
6151 let precision = match modifiers.next() {
6152 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6153 None => None,
6154 };
6155 if modifiers.next().is_some() {
6156 sql_bail!("type timestamp supports at most one type modifier");
6157 }
6158 Ok(SqlScalarType::Timestamp { precision })
6159 }
6160 CatalogType::TimestampTz => {
6161 let mut modifiers = modifiers.iter().fuse();
6162 let precision = match modifiers.next() {
6163 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6164 None => None,
6165 };
6166 if modifiers.next().is_some() {
6167 sql_bail!("type timestamp with time zone supports at most one type modifier");
6168 }
6169 Ok(SqlScalarType::TimestampTz { precision })
6170 }
6171 t => {
6172 if !modifiers.is_empty() {
6173 sql_bail!(
6174 "{} does not support type modifiers",
6175 catalog.resolve_full_name(entry.name()).to_string()
6176 );
6177 }
6178 match t {
6179 CatalogType::Array {
6180 element_reference: element_id,
6181 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6182 catalog,
6183 *element_id,
6184 modifiers,
6185 )?))),
6186 CatalogType::List {
6187 element_reference: element_id,
6188 element_modifiers,
6189 } => Ok(SqlScalarType::List {
6190 element_type: Box::new(scalar_type_from_catalog(
6191 catalog,
6192 *element_id,
6193 element_modifiers,
6194 )?),
6195 custom_id: Some(id),
6196 }),
6197 CatalogType::Map {
6198 key_reference: _,
6199 key_modifiers: _,
6200 value_reference: value_id,
6201 value_modifiers,
6202 } => Ok(SqlScalarType::Map {
6203 value_type: Box::new(scalar_type_from_catalog(
6204 catalog,
6205 *value_id,
6206 value_modifiers,
6207 )?),
6208 custom_id: Some(id),
6209 }),
6210 CatalogType::Range {
6211 element_reference: element_id,
6212 } => Ok(SqlScalarType::Range {
6213 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6214 }),
6215 CatalogType::Record { fields } => {
6216 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6217 .iter()
6218 .map(|f| {
6219 let scalar_type = scalar_type_from_catalog(
6220 catalog,
6221 f.type_reference,
6222 &f.type_modifiers,
6223 )?;
6224 Ok((
6225 f.name.clone(),
6226 SqlColumnType {
6227 scalar_type,
6228 nullable: true,
6229 },
6230 ))
6231 })
6232 .collect::<Result<Box<_>, PlanError>>()?;
6233 Ok(SqlScalarType::Record {
6234 fields: scalars,
6235 custom_id: Some(id),
6236 })
6237 }
6238 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6239 CatalogType::Bool => Ok(SqlScalarType::Bool),
6240 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6241 CatalogType::Date => Ok(SqlScalarType::Date),
6242 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6243 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6244 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6245 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6246 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6247 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6248 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6249 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6250 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6251 CatalogType::Interval => Ok(SqlScalarType::Interval),
6252 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6253 CatalogType::Oid => Ok(SqlScalarType::Oid),
6254 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6255 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6256 CatalogType::Pseudo => {
6257 sql_bail!(
6258 "cannot reference pseudo type {}",
6259 catalog.resolve_full_name(entry.name()).to_string()
6260 )
6261 }
6262 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6263 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6264 CatalogType::RegType => Ok(SqlScalarType::RegType),
6265 CatalogType::String => Ok(SqlScalarType::String),
6266 CatalogType::Time => Ok(SqlScalarType::Time),
6267 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6268 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6269 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6270 CatalogType::Numeric => unreachable!("handled above"),
6271 CatalogType::Char => unreachable!("handled above"),
6272 CatalogType::VarChar => unreachable!("handled above"),
6273 CatalogType::Timestamp => unreachable!("handled above"),
6274 CatalogType::TimestampTz => unreachable!("handled above"),
6275 }
6276 }
6277 }
6278}
6279
6280struct AggregateTableFuncVisitor<'a> {
6283 scx: &'a StatementContext<'a>,
6284 aggs: Vec<Function<Aug>>,
6285 within_aggregate: bool,
6286 tables: BTreeMap<Function<Aug>, String>,
6287 table_disallowed_context: Vec<&'static str>,
6288 in_select_item: bool,
6289 id_gen: IdGen,
6290 err: Option<PlanError>,
6291}
6292
6293impl<'a> AggregateTableFuncVisitor<'a> {
6294 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6295 AggregateTableFuncVisitor {
6296 scx,
6297 aggs: Vec::new(),
6298 within_aggregate: false,
6299 tables: BTreeMap::new(),
6300 table_disallowed_context: Vec::new(),
6301 in_select_item: false,
6302 id_gen: Default::default(),
6303 err: None,
6304 }
6305 }
6306
6307 fn into_result(
6308 self,
6309 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6310 match self.err {
6311 Some(err) => Err(err),
6312 None => {
6313 let mut seen = BTreeSet::new();
6316 let aggs = self
6317 .aggs
6318 .into_iter()
6319 .filter(move |agg| seen.insert(agg.clone()))
6320 .collect();
6321 Ok((aggs, self.tables))
6322 }
6323 }
6324 }
6325}
6326
6327impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6328 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6329 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6330 Ok(i) => i,
6331 Err(_) => return,
6333 };
6334
6335 match item.func() {
6336 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6339 if self.within_aggregate {
6340 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6341 return;
6342 }
6343 self.aggs.push(func.clone());
6344 let Function {
6345 name: _,
6346 args,
6347 filter,
6348 over: _,
6349 distinct: _,
6350 } = func;
6351 if let Some(filter) = filter {
6352 self.visit_expr_mut(filter);
6353 }
6354 let old_within_aggregate = self.within_aggregate;
6355 self.within_aggregate = true;
6356 self.table_disallowed_context
6357 .push("aggregate function calls");
6358
6359 self.visit_function_args_mut(args);
6360
6361 self.within_aggregate = old_within_aggregate;
6362 self.table_disallowed_context.pop();
6363 }
6364 Ok(Func::Table { .. }) => {
6365 self.table_disallowed_context.push("other table functions");
6366 visit_mut::visit_function_mut(self, func);
6367 self.table_disallowed_context.pop();
6368 }
6369 _ => visit_mut::visit_function_mut(self, func),
6370 }
6371 }
6372
6373 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6374 }
6376
6377 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6378 let (disallowed_context, func) = match expr {
6379 Expr::Case { .. } => (Some("CASE"), None),
6380 Expr::HomogenizingFunction {
6381 function: HomogenizingFunction::Coalesce,
6382 ..
6383 } => (Some("COALESCE"), None),
6384 Expr::Function(func) if self.in_select_item => {
6385 let mut table_func = None;
6388 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6389 if let Ok(Func::Table { .. }) = item.func() {
6390 if let Some(context) = self.table_disallowed_context.last() {
6391 self.err = Some(sql_err!(
6392 "table functions are not allowed in {} (function {})",
6393 context,
6394 func.name
6395 ));
6396 return;
6397 }
6398 table_func = Some(func.clone());
6399 }
6400 }
6401 (None, table_func)
6404 }
6405 _ => (None, None),
6406 };
6407 if let Some(func) = func {
6408 visit_mut::visit_expr_mut(self, expr);
6410 if let Function {
6412 name: _,
6413 args: _,
6414 filter: None,
6415 over: None,
6416 distinct: false,
6417 } = &func
6418 {
6419 let unique_id = self.id_gen.allocate_id();
6421 let id = self
6422 .tables
6423 .entry(func)
6424 .or_insert_with(|| format!("table_func_{unique_id}"));
6425 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6428 }
6429 }
6430 if let Some(context) = disallowed_context {
6431 self.table_disallowed_context.push(context);
6432 }
6433
6434 visit_mut::visit_expr_mut(self, expr);
6435
6436 if disallowed_context.is_some() {
6437 self.table_disallowed_context.pop();
6438 }
6439 }
6440
6441 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6442 let old = self.in_select_item;
6443 self.in_select_item = true;
6444 visit_mut::visit_select_item_mut(self, si);
6445 self.in_select_item = old;
6446 }
6447}
6448
6449#[derive(Default)]
6450struct WindowFuncCollector {
6451 window_funcs: Vec<Expr<Aug>>,
6452}
6453
6454impl WindowFuncCollector {
6455 fn into_result(self) -> Vec<Expr<Aug>> {
6456 let mut seen = BTreeSet::new();
6458 let window_funcs_dedupped = self
6459 .window_funcs
6460 .into_iter()
6461 .filter(move |expr| seen.insert(expr.clone()))
6462 .rev()
6465 .collect();
6466 window_funcs_dedupped
6467 }
6468}
6469
6470impl Visit<'_, Aug> for WindowFuncCollector {
6471 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6472 match expr {
6473 Expr::Function(func) => {
6474 if func.over.is_some() {
6475 self.window_funcs.push(expr.clone());
6476 }
6477 }
6478 _ => (),
6479 }
6480 visit::visit_expr(self, expr);
6481 }
6482
6483 fn visit_query(&mut self, _query: &Query<Aug>) {
6484 }
6486}
6487
6488#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6490pub enum QueryLifetime {
6491 OneShot,
6493 Index,
6495 MaterializedView,
6497 Subscribe,
6499 View,
6501 Source,
6503}
6504
6505impl QueryLifetime {
6506 pub fn is_one_shot(&self) -> bool {
6510 let result = match self {
6511 QueryLifetime::OneShot => true,
6512 QueryLifetime::Index => false,
6513 QueryLifetime::MaterializedView => false,
6514 QueryLifetime::Subscribe => false,
6515 QueryLifetime::View => false,
6516 QueryLifetime::Source => false,
6517 };
6518 assert_eq!(!result, self.is_maintained());
6519 result
6520 }
6521
6522 pub fn is_maintained(&self) -> bool {
6525 match self {
6526 QueryLifetime::OneShot => false,
6527 QueryLifetime::Index => true,
6528 QueryLifetime::MaterializedView => true,
6529 QueryLifetime::Subscribe => true,
6530 QueryLifetime::View => true,
6531 QueryLifetime::Source => true,
6532 }
6533 }
6534
6535 pub fn allow_show(&self) -> bool {
6537 match self {
6538 QueryLifetime::OneShot => true,
6539 QueryLifetime::Index => false,
6540 QueryLifetime::MaterializedView => false,
6541 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6543 QueryLifetime::Source => false,
6544 }
6545 }
6546}
6547
6548#[derive(Debug, Clone)]
6550pub struct CteDesc {
6551 pub name: String,
6552 pub desc: RelationDesc,
6553}
6554
6555#[derive(Debug, Clone)]
6557pub struct QueryContext<'a> {
6558 pub scx: &'a StatementContext<'a>,
6560 pub lifetime: QueryLifetime,
6562 pub outer_scopes: Vec<Scope>,
6564 pub outer_relation_types: Vec<SqlRelationType>,
6566 pub ctes: BTreeMap<LocalId, CteDesc>,
6568 pub name_manager: Rc<RefCell<NameManager>>,
6570 pub recursion_guard: RecursionGuard,
6571}
6572
6573impl CheckedRecursion for QueryContext<'_> {
6574 fn recursion_guard(&self) -> &RecursionGuard {
6575 &self.recursion_guard
6576 }
6577}
6578
6579impl<'a> QueryContext<'a> {
6580 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6581 QueryContext {
6582 scx,
6583 lifetime,
6584 outer_scopes: vec![],
6585 outer_relation_types: vec![],
6586 ctes: BTreeMap::new(),
6587 name_manager: Rc::new(RefCell::new(NameManager::new())),
6588 recursion_guard: RecursionGuard::with_limit(1024), }
6590 }
6591
6592 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6593 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6594 }
6595
6596 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6599 let ctes = self.ctes.clone();
6600 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6601 let outer_relation_types = iter::once(relation_type)
6602 .chain(self.outer_relation_types.clone())
6603 .collect();
6604 let name_manager = Rc::clone(&self.name_manager);
6606
6607 QueryContext {
6608 scx: self.scx,
6609 lifetime: self.lifetime,
6610 outer_scopes,
6611 outer_relation_types,
6612 ctes,
6613 name_manager,
6614 recursion_guard: self.recursion_guard.clone(),
6615 }
6616 }
6617
6618 fn empty_derived_context(&self) -> QueryContext<'a> {
6620 let scope = Scope::empty();
6621 let ty = SqlRelationType::empty();
6622 self.derived_context(scope, ty)
6623 }
6624
6625 pub fn resolve_table_name(
6628 &self,
6629 object: ResolvedItemName,
6630 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6631 match object {
6632 ResolvedItemName::Item {
6633 id,
6634 full_name,
6635 version,
6636 ..
6637 } => {
6638 let item = self.scx.get_item(&id).at_version(version);
6639 let desc = match item.relation_desc() {
6640 Some(desc) => desc.clone(),
6641 None => {
6642 return Err(PlanError::InvalidDependency {
6643 name: full_name.to_string(),
6644 item_type: item.item_type().to_string(),
6645 });
6646 }
6647 };
6648 let expr = HirRelationExpr::Get {
6649 id: Id::Global(item.global_id()),
6650 typ: desc.typ().clone(),
6651 };
6652
6653 let name = full_name.into();
6654 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6655
6656 Ok((expr, scope))
6657 }
6658 ResolvedItemName::Cte { id, name } => {
6659 let name = name.into();
6660 let cte = self.ctes.get(&id).unwrap();
6661 let expr = HirRelationExpr::Get {
6662 id: Id::Local(id),
6663 typ: cte.desc.typ().clone(),
6664 };
6665
6666 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6667
6668 Ok((expr, scope))
6669 }
6670 ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6671 }
6672 }
6673
6674 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6677 self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6678 }
6679}
6680
6681#[derive(Debug, Clone)]
6683pub struct ExprContext<'a> {
6684 pub qcx: &'a QueryContext<'a>,
6685 pub name: &'a str,
6687 pub scope: &'a Scope,
6690 pub relation_type: &'a SqlRelationType,
6693 pub allow_aggregates: bool,
6695 pub allow_subqueries: bool,
6697 pub allow_parameters: bool,
6699 pub allow_windows: bool,
6701}
6702
6703impl CheckedRecursion for ExprContext<'_> {
6704 fn recursion_guard(&self) -> &RecursionGuard {
6705 &self.qcx.recursion_guard
6706 }
6707}
6708
6709impl<'a> ExprContext<'a> {
6710 pub fn catalog(&self) -> &dyn SessionCatalog {
6711 self.qcx.scx.catalog
6712 }
6713
6714 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6715 let mut ecx = self.clone();
6716 ecx.name = name;
6717 ecx
6718 }
6719
6720 pub fn column_type<E>(&self, expr: &E) -> E::Type
6721 where
6722 E: AbstractExpr,
6723 {
6724 expr.typ(
6725 &self.qcx.outer_relation_types,
6726 self.relation_type,
6727 &self.qcx.scx.param_types.borrow(),
6728 )
6729 }
6730
6731 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6732 where
6733 E: AbstractExpr,
6734 {
6735 self.column_type(expr).scalar_type()
6736 }
6737
6738 fn derived_query_context(&self) -> QueryContext<'_> {
6739 let mut scope = self.scope.clone();
6740 scope.lateral_barrier = true;
6741 self.qcx.derived_context(scope, self.relation_type.clone())
6742 }
6743
6744 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6745 self.qcx.scx.require_feature_flag(flag)
6746 }
6747
6748 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6749 &self.qcx.scx.param_types
6750 }
6751
6752 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6755 self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6756 }
6757
6758 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6759 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6760 }
6761}
6762
6763#[derive(Debug, Clone)]
6769pub struct NameManager(BTreeSet<Arc<str>>);
6770
6771impl NameManager {
6772 pub fn new() -> Self {
6774 Self(BTreeSet::new())
6775 }
6776
6777 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6780 let s = s.as_ref();
6781 if let Some(interned) = self.0.get(s) {
6782 Arc::clone(interned)
6783 } else {
6784 let interned: Arc<str> = Arc::from(s);
6785 self.0.insert(Arc::clone(&interned));
6786 interned
6787 }
6788 }
6789
6790 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6793 self.intern(item.column_name.as_str())
6809 }
6810}
6811
6812#[cfg(test)]
6813mod test {
6814 use super::*;
6815
6816 #[mz_ore::test]
6821 pub fn test_name_manager_string_interning() {
6822 let mut nm = NameManager::new();
6823
6824 let orig_hi = "hi";
6825 let hi = nm.intern(orig_hi);
6826 let hello = nm.intern("hello");
6827
6828 assert_ne!(hi.as_ptr(), hello.as_ptr());
6829
6830 let hi2 = nm.intern("hi");
6832 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6833
6834 let s = format!(
6836 "{}{}",
6837 hi.chars().nth(0).unwrap(),
6838 hi2.chars().nth(1).unwrap()
6839 );
6840 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6842
6843 let hi3 = nm.intern(s);
6844 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6845 }
6846}