1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50 ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51 MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56 TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72 ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73 UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109 literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117 pub expr: E,
118 pub desc: RelationDesc,
119 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120 pub scope: Scope,
121}
122
123#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133 scx: &StatementContext,
134 mut query: Query<Aug>,
135 lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137 transform_ast::transform(scx, &mut query)?;
138 let mut qcx = QueryContext::root(scx, lifetime);
139 let PlannedQuery {
140 mut expr,
141 scope,
142 order_by,
143 limit,
144 offset,
145 project,
146 group_size_hints,
147 } = plan_query(&mut qcx, &query)?;
148
149 let mut finishing = RowSetFinishing {
150 limit,
151 offset,
152 project,
153 order_by,
154 };
155
156 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163 if lifetime.is_maintained() {
164 expr.finish_maintained(&mut finishing, group_size_hints);
165 }
166
167 let typ = qcx.relation_type(&expr);
168 let typ = SqlRelationType::new(
169 finishing
170 .project
171 .iter()
172 .map(|i| typ.column_types[*i].clone())
173 .collect(),
174 );
175 let desc = RelationDesc::new(typ, scope.column_names());
176
177 Ok(PlannedRootQuery {
178 expr,
179 desc,
180 finishing,
181 scope,
182 })
183}
184
185fn try_push_projection_order_by(
196 expr: &mut HirRelationExpr,
197 project: &mut Vec<usize>,
198 order_by: &mut Vec<ColumnOrder>,
199) -> bool {
200 let mut unproject = vec![None; expr.arity()];
201 for (out_i, in_i) in project.iter().copied().enumerate() {
202 unproject[in_i] = Some(out_i);
203 }
204 if order_by
205 .iter()
206 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
207 {
208 let trivial_project = (0..project.len()).collect();
209 *expr = expr.take().project(mem::replace(project, trivial_project));
210 for ob in order_by {
211 ob.column = unproject[ob.column].unwrap();
212 }
213 true
214 } else {
215 false
216 }
217}
218
219pub fn plan_insert_query(
220 scx: &StatementContext,
221 table_name: ResolvedItemName,
222 columns: Vec<Ident>,
223 source: InsertSource<Aug>,
224 returning: Vec<SelectItem<Aug>>,
225) -> Result<
226 (
227 CatalogItemId,
228 HirRelationExpr,
229 PlannedRootQuery<Vec<HirScalarExpr>>,
230 ),
231 PlanError,
232> {
233 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
234 let table = scx.get_item_by_resolved_name(&table_name)?;
235
236 if table.item_type() != CatalogItemType::Table {
238 sql_bail!(
239 "cannot insert into {} '{}'",
240 table.item_type(),
241 table_name.full_name_str()
242 );
243 }
244 let desc = table
245 .relation_desc()
246 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
247 let mut defaults = table
248 .writable_table_details()
249 .ok_or_else(|| {
250 sql_err!(
251 "cannot insert into non-writeable table '{}'",
252 table_name.full_name_str()
253 )
254 })?
255 .to_vec();
256
257 for default in &mut defaults {
258 transform_ast::transform(scx, default)?;
259 }
260
261 if table.id().is_system() {
262 sql_bail!(
263 "cannot insert into system table '{}'",
264 table_name.full_name_str()
265 );
266 }
267
268 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
269
270 let mut source_types = Vec::with_capacity(columns.len());
272 let mut ordering = Vec::with_capacity(columns.len());
273
274 if columns.is_empty() {
275 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
278 ordering.extend(0..desc.arity());
279 } else {
280 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
281 .iter()
282 .enumerate()
283 .map(|(idx, (name, typ))| (name, (idx, typ)))
284 .collect();
285
286 for c in &columns {
287 if let Some((idx, typ)) = column_by_name.get(c) {
288 ordering.push(*idx);
289 source_types.push(&typ.scalar_type);
290 } else {
291 sql_bail!(
292 "column {} of relation {} does not exist",
293 c.quoted(),
294 table_name.full_name_str().quoted()
295 );
296 }
297 }
298 if let Some(dup) = columns.iter().duplicates().next() {
299 sql_bail!("column {} specified more than once", dup.quoted());
300 }
301 };
302
303 let expr = match source {
305 InsertSource::Query(mut query) => {
306 transform_ast::transform(scx, &mut query)?;
307
308 match query {
309 Query {
311 body: SetExpr::Values(Values(values)),
312 ctes,
313 order_by,
314 limit: None,
315 offset: None,
316 } if ctes.is_empty() && order_by.is_empty() => {
317 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
318 plan_values_insert(&qcx, &names, &source_types, &values)?
319 }
320 _ => {
321 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
322 expr
323 }
324 }
325 }
326 InsertSource::DefaultValues => {
327 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
328 }
329 };
330
331 let expr_arity = expr.arity();
332
333 let max_columns = if columns.is_empty() {
336 desc.arity()
337 } else {
338 columns.len()
339 };
340 if expr_arity > max_columns {
341 sql_bail!("INSERT has more expressions than target columns");
342 }
343 if expr_arity < columns.len() {
345 sql_bail!("INSERT has more target columns than expressions");
346 }
347
348 source_types.truncate(expr_arity);
350 ordering.truncate(expr_arity);
351
352 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
355 sql_err!(
356 "column {} is of type {} but expression is of type {}",
357 desc.get_name(ordering[e.column]).quoted(),
358 qcx.humanize_sql_scalar_type(&e.target_type, false),
359 qcx.humanize_sql_scalar_type(&e.source_type, false),
360 )
361 })?;
362
363 let mut map_exprs = vec![];
365 let mut project_key = Vec::with_capacity(desc.arity());
366
367 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
369
370 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
371 for (col_idx, (col_typ, default)) in column_details {
372 if let Some(src_idx) = col_to_source.get(&col_idx) {
373 project_key.push(*src_idx);
374 } else {
375 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
376 project_key.push(expr_arity + map_exprs.len());
377 map_exprs.push(hir);
378 }
379 }
380
381 let returning = {
382 let (scope, typ) = if let ResolvedItemName::Item {
383 full_name,
384 version: _,
385 ..
386 } = table_name
387 {
388 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
389 let typ = desc.typ().clone();
390 (scope, typ)
391 } else {
392 (Scope::empty(), SqlRelationType::empty())
393 };
394 let ecx = &ExprContext {
395 qcx: &qcx,
396 name: "RETURNING clause",
397 scope: &scope,
398 relation_type: &typ,
399 allow_aggregates: false,
400 allow_subqueries: false,
401 allow_parameters: true,
402 allow_windows: false,
403 };
404 let table_func_names = BTreeMap::new();
405 let mut output_columns = vec![];
406 let mut new_exprs = vec![];
407 let mut new_type = SqlRelationType::empty();
408 for mut si in returning {
409 transform_ast::transform(scx, &mut si)?;
410 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
411 let expr = match &select_item {
412 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
413 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
414 };
415 output_columns.push(column_name);
416 let typ = ecx.column_type(&expr);
417 new_type.column_types.push(typ);
418 new_exprs.push(expr);
419 }
420 }
421 let desc = RelationDesc::new(new_type, output_columns);
422 let desc_arity = desc.arity();
423 PlannedRootQuery {
424 expr: new_exprs,
425 desc,
426 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
427 scope,
428 }
429 };
430
431 Ok((
432 table.id(),
433 expr.map(map_exprs).project(project_key),
434 returning,
435 ))
436}
437
438pub fn plan_copy_item(
449 scx: &StatementContext,
450 item_name: ResolvedItemName,
451 columns: Vec<Ident>,
452) -> Result<
453 (
454 CatalogItemId,
455 RelationDesc,
456 Vec<ColumnIndex>,
457 Option<MapFilterProject>,
458 ),
459 PlanError,
460> {
461 let item = scx.get_item_by_resolved_name(&item_name)?;
462 let fullname = scx.catalog.resolve_full_name(item.name());
463 let table_desc = match item.relation_desc() {
464 Some(desc) => desc.into_owned(),
465 None => {
466 return Err(PlanError::InvalidDependency {
467 name: fullname.to_string(),
468 item_type: item.item_type().to_string(),
469 });
470 }
471 };
472 let mut ordering = Vec::with_capacity(columns.len());
473
474 let mfp = if let Some(table_defaults) = item.writable_table_details() {
484 let mut table_defaults = table_defaults.to_vec();
485
486 for default in &mut table_defaults {
487 transform_ast::transform(scx, default)?;
488 }
489
490 let source_column_names: Vec<_> = columns
492 .iter()
493 .cloned()
494 .map(normalize::column_name)
495 .collect();
496
497 let mut default_exprs = Vec::new();
498 let mut project_keys = Vec::with_capacity(table_desc.arity());
499
500 let column_details = table_desc.iter().zip_eq(table_defaults);
503 for ((col_name, col_type), col_default) in column_details {
504 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
505 if let Some(src_idx) = maybe_src_idx {
506 project_keys.push(src_idx);
507 } else {
508 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
511 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
512 project_keys.push(source_column_names.len() + default_exprs.len());
513 default_exprs.push(mir);
514 }
515 }
516
517 let mfp = MapFilterProject::new(source_column_names.len())
518 .map(default_exprs)
519 .project(project_keys);
520 Some(mfp)
521 } else {
522 None
523 };
524
525 let source_desc = if columns.is_empty() {
527 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
528 ordering.extend(indexes);
529
530 table_desc
532 } else {
533 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
534 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
535 .iter_all()
536 .map(|(idx, name, typ)| (name, (*idx, typ)))
537 .collect();
538
539 let mut names = Vec::with_capacity(columns.len());
540 let mut source_types = Vec::with_capacity(columns.len());
541
542 for c in &columns {
543 if let Some((idx, typ)) = column_by_name.get(c) {
544 ordering.push(*idx);
545 source_types.push((*typ).clone());
546 names.push(c.clone());
547 } else {
548 sql_bail!(
549 "column {} of relation {} does not exist",
550 c.quoted(),
551 item_name.full_name_str().quoted()
552 );
553 }
554 }
555 if let Some(dup) = columns.iter().duplicates().next() {
556 sql_bail!("column {} specified more than once", dup.quoted());
557 }
558
559 RelationDesc::new(SqlRelationType::new(source_types), names)
561 };
562
563 Ok((item.id(), source_desc, ordering, mfp))
564}
565
566pub fn plan_copy_from(
570 scx: &StatementContext,
571 table_name: ResolvedItemName,
572 columns: Vec<Ident>,
573) -> Result<
574 (
575 CatalogItemId,
576 RelationDesc,
577 Vec<ColumnIndex>,
578 Option<MapFilterProject>,
579 ),
580 PlanError,
581> {
582 let table = scx.get_item_by_resolved_name(&table_name)?;
583
584 if table.item_type() != CatalogItemType::Table {
586 sql_bail!(
587 "cannot insert into {} '{}'",
588 table.item_type(),
589 table_name.full_name_str()
590 );
591 }
592
593 let _ = table.writable_table_details().ok_or_else(|| {
594 sql_err!(
595 "cannot insert into non-writeable table '{}'",
596 table_name.full_name_str()
597 )
598 })?;
599
600 if table.id().is_system() {
601 sql_bail!(
602 "cannot insert into system table '{}'",
603 table_name.full_name_str()
604 );
605 }
606 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
607
608 Ok((id, desc, ordering, mfp))
609}
610
611pub fn plan_copy_from_rows(
614 pcx: &PlanContext,
615 catalog: &dyn SessionCatalog,
616 target_id: CatalogItemId,
617 target_name: String,
618 columns: Vec<ColumnIndex>,
619 rows: Vec<mz_repr::Row>,
620) -> Result<HirRelationExpr, PlanError> {
621 let scx = StatementContext::new(Some(pcx), catalog);
622
623 let table = catalog
625 .try_get_item(&target_id)
626 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
627 .at_version(RelationVersionSelector::Latest);
628
629 let mut defaults = table
630 .writable_table_details()
631 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
632 .to_vec();
633
634 for default in &mut defaults {
635 transform_ast::transform(&scx, default)?;
636 }
637
638 let desc = table
639 .relation_desc()
640 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
641 let column_types = columns
642 .iter()
643 .map(|x| desc.get_type(x).clone())
644 .map(|mut x| {
645 x.nullable = true;
648 x
649 })
650 .collect();
651 let typ = SqlRelationType::new(column_types);
652 let expr = HirRelationExpr::Constant {
653 rows,
654 typ: typ.clone(),
655 };
656
657 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
663 if columns == default {
664 return Ok(expr);
665 }
666
667 let mut map_exprs = vec![];
669 let mut project_key = Vec::with_capacity(desc.arity());
670
671 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
673
674 let column_details = desc.iter_all().zip_eq(defaults);
675 for ((col_idx, _col_name, col_typ), default) in column_details {
676 if let Some(src_idx) = col_to_source.get(&col_idx) {
677 project_key.push(*src_idx);
678 } else {
679 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
680 project_key.push(typ.arity() + map_exprs.len());
681 map_exprs.push(hir);
682 }
683 }
684
685 Ok(expr.map(map_exprs).project(project_key))
686}
687
688pub struct ReadThenWritePlan {
690 pub id: CatalogItemId,
691 pub selection: HirRelationExpr,
696 pub assignments: BTreeMap<usize, HirScalarExpr>,
698 pub finishing: RowSetFinishing,
699}
700
701pub fn plan_delete_query(
702 scx: &StatementContext,
703 mut delete_stmt: DeleteStatement<Aug>,
704) -> Result<ReadThenWritePlan, PlanError> {
705 transform_ast::transform(scx, &mut delete_stmt)?;
706
707 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
708 plan_mutation_query_inner(
709 qcx,
710 delete_stmt.table_name,
711 delete_stmt.alias,
712 delete_stmt.using,
713 vec![],
714 delete_stmt.selection,
715 )
716}
717
718pub fn plan_update_query(
719 scx: &StatementContext,
720 mut update_stmt: UpdateStatement<Aug>,
721) -> Result<ReadThenWritePlan, PlanError> {
722 transform_ast::transform(scx, &mut update_stmt)?;
723
724 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
725
726 plan_mutation_query_inner(
727 qcx,
728 update_stmt.table_name,
729 update_stmt.alias,
730 vec![],
731 update_stmt.assignments,
732 update_stmt.selection,
733 )
734}
735
736pub fn plan_mutation_query_inner(
737 qcx: QueryContext,
738 table_name: ResolvedItemName,
739 alias: Option<TableAlias>,
740 using: Vec<TableWithJoins<Aug>>,
741 assignments: Vec<Assignment<Aug>>,
742 selection: Option<Expr<Aug>>,
743) -> Result<ReadThenWritePlan, PlanError> {
744 let (id, version) = match table_name {
746 ResolvedItemName::Item { id, version, .. } => (id, version),
747 _ => sql_bail!("cannot mutate non-user table"),
748 };
749
750 let item = qcx.scx.get_item(&id).at_version(version);
752 if item.item_type() != CatalogItemType::Table {
753 sql_bail!(
754 "cannot mutate {} '{}'",
755 item.item_type(),
756 table_name.full_name_str()
757 );
758 }
759 let _ = item.writable_table_details().ok_or_else(|| {
760 sql_err!(
761 "cannot mutate non-writeable table '{}'",
762 table_name.full_name_str()
763 )
764 })?;
765 if id.is_system() {
766 sql_bail!(
767 "cannot mutate system table '{}'",
768 table_name.full_name_str()
769 );
770 }
771
772 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
774 let scope = plan_table_alias(scope, alias.as_ref())?;
775 let desc = item.relation_desc().expect("table has desc");
776 let relation_type = qcx.relation_type(&get);
777
778 if using.is_empty() {
779 if let Some(expr) = selection {
780 let ecx = &ExprContext {
781 qcx: &qcx,
782 name: "WHERE clause",
783 scope: &scope,
784 relation_type: &relation_type,
785 allow_aggregates: false,
786 allow_subqueries: true,
787 allow_parameters: true,
788 allow_windows: false,
789 };
790 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
791 get = get.filter(vec![expr]);
792 }
793 } else {
794 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
795 }
796
797 let mut sets = BTreeMap::new();
798 for Assignment { id, value } in assignments {
799 let name = normalize::column_name(id);
801 match desc.get_by_name(&name) {
802 Some((idx, typ)) => {
803 let ecx = &ExprContext {
804 qcx: &qcx,
805 name: "SET clause",
806 scope: &scope,
807 relation_type: &relation_type,
808 allow_aggregates: false,
809 allow_subqueries: false,
810 allow_parameters: true,
811 allow_windows: false,
812 };
813 let expr = plan_expr(ecx, &value)?.cast_to(
814 ecx,
815 CastContext::Assignment,
816 &typ.scalar_type,
817 )?;
818
819 if sets.insert(idx, expr).is_some() {
820 sql_bail!("column {} set twice", name)
821 }
822 }
823 None => sql_bail!("unknown column {}", name),
824 };
825 }
826
827 let finishing = RowSetFinishing {
828 order_by: vec![],
829 limit: None,
830 offset: 0,
831 project: (0..desc.arity()).collect(),
832 };
833
834 Ok(ReadThenWritePlan {
835 id,
836 selection: get,
837 finishing,
838 assignments: sets,
839 })
840}
841
842fn handle_mutation_using_clause(
854 qcx: &QueryContext,
855 selection: Option<Expr<Aug>>,
856 using: Vec<TableWithJoins<Aug>>,
857 get: HirRelationExpr,
858 outer_scope: Scope,
859) -> Result<HirRelationExpr, PlanError> {
860 let (mut using_rel_expr, using_scope) =
864 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
865 let (left, left_scope) = l;
866 plan_join(
867 qcx,
868 left,
869 left_scope,
870 &Join {
871 relation: TableFactor::NestedJoin {
872 join: Box::new(twj),
873 alias: None,
874 },
875 join_operator: JoinOperator::CrossJoin,
876 },
877 )
878 })?;
879
880 if let Some(expr) = selection {
881 let on = HirScalarExpr::literal_true();
887 let joined = using_rel_expr
888 .clone()
889 .join(get.clone(), on, JoinKind::Inner);
890 let joined_scope = using_scope.product(outer_scope)?;
891 let joined_relation_type = qcx.relation_type(&joined);
892
893 let ecx = &ExprContext {
894 qcx,
895 name: "WHERE clause",
896 scope: &joined_scope,
897 relation_type: &joined_relation_type,
898 allow_aggregates: false,
899 allow_subqueries: true,
900 allow_parameters: true,
901 allow_windows: false,
902 };
903
904 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
906
907 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
911 use mz_expr::visit::Visit;
913 expr.visit_mut_post(&mut |e| {
914 if let HirScalarExpr::Column(c, _name) = e {
915 if c.column >= using_rel_arity {
916 c.level += 1;
917 c.column -= using_rel_arity;
918 };
919 }
920 })?;
921
922 using_rel_expr = using_rel_expr.filter(vec![expr]);
926 } else {
927 let _joined_scope = using_scope.product(outer_scope)?;
930 }
931 Ok(get.filter(vec![using_rel_expr.exists()]))
942}
943
944#[derive(Debug)]
945pub(crate) struct CastRelationError {
946 pub(crate) column: usize,
947 pub(crate) source_type: SqlScalarType,
948 pub(crate) target_type: SqlScalarType,
949}
950
951pub(crate) fn cast_relation<'a, I>(
955 qcx: &QueryContext,
956 ccx: CastContext,
957 expr: HirRelationExpr,
958 target_types: I,
959) -> Result<HirRelationExpr, CastRelationError>
960where
961 I: IntoIterator<Item = &'a SqlScalarType>,
962{
963 let ecx = &ExprContext {
964 qcx,
965 name: "values",
966 scope: &Scope::empty(),
967 relation_type: &qcx.relation_type(&expr),
968 allow_aggregates: false,
969 allow_subqueries: true,
970 allow_parameters: true,
971 allow_windows: false,
972 };
973 let mut map_exprs = vec![];
974 let mut project_key = vec![];
975 for (i, target_typ) in target_types.into_iter().enumerate() {
976 let expr = HirScalarExpr::column(i);
977 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
981 Ok(cast_expr) => {
982 if expr == cast_expr {
983 project_key.push(i);
985 } else {
986 project_key.push(ecx.relation_type.arity() + map_exprs.len());
988 map_exprs.push(cast_expr);
989 }
990 }
991 Err(_) => {
992 return Err(CastRelationError {
993 column: i,
994 source_type: ecx.scalar_type(&expr),
995 target_type: target_typ.clone(),
996 });
997 }
998 }
999 }
1000 Ok(expr.map(map_exprs).project(project_key))
1001}
1002
1003pub fn plan_as_of(
1006 scx: &StatementContext,
1007 as_of: Option<AsOf<Aug>>,
1008) -> Result<QueryWhen, PlanError> {
1009 match as_of {
1010 None => Ok(QueryWhen::Immediately),
1011 Some(as_of) => match as_of {
1012 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1014 },
1015 }
1016}
1017
1018pub fn plan_as_of_or_up_to(
1028 scx: &StatementContext,
1029 mut expr: Expr<Aug>,
1030) -> Result<mz_repr::Timestamp, PlanError> {
1031 let scope = Scope::empty();
1032 let desc = RelationDesc::empty();
1033 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1036 transform_ast::transform(scx, &mut expr)?;
1037 let ecx = &ExprContext {
1038 qcx: &qcx,
1039 name: "AS OF or UP TO",
1040 scope: &scope,
1041 relation_type: desc.typ(),
1042 allow_aggregates: false,
1043 allow_subqueries: false,
1044 allow_parameters: false,
1045 allow_windows: false,
1046 };
1047 let hir = plan_expr(ecx, &expr)?.cast_to(
1048 ecx,
1049 CastContext::Assignment,
1050 &SqlScalarType::MzTimestamp,
1051 )?;
1052 if hir.contains_unmaterializable() {
1053 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1054 }
1055 let timestamp = hir
1062 .into_literal_mz_timestamp()
1063 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1064 Ok(timestamp)
1065}
1066
1067pub fn plan_secret_as(
1069 scx: &StatementContext,
1070 mut expr: Expr<Aug>,
1071) -> Result<MirScalarExpr, PlanError> {
1072 let scope = Scope::empty();
1073 let desc = RelationDesc::empty();
1074 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075
1076 transform_ast::transform(scx, &mut expr)?;
1077
1078 let ecx = &ExprContext {
1079 qcx: &qcx,
1080 name: "AS",
1081 scope: &scope,
1082 relation_type: desc.typ(),
1083 allow_aggregates: false,
1084 allow_subqueries: false,
1085 allow_parameters: false,
1086 allow_windows: false,
1087 };
1088 let expr = plan_expr(ecx, &expr)?
1089 .type_as(ecx, &SqlScalarType::Bytes)?
1090 .lower_uncorrelated(scx.catalog.system_vars())?;
1091 Ok(expr)
1092}
1093
1094pub fn plan_webhook_validate_using(
1096 scx: &StatementContext,
1097 validate_using: CreateWebhookSourceCheck<Aug>,
1098) -> Result<WebhookValidation, PlanError> {
1099 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1100
1101 let CreateWebhookSourceCheck {
1102 options,
1103 using: mut expr,
1104 } = validate_using;
1105
1106 let mut column_typs = vec![];
1107 let mut column_names = vec![];
1108
1109 let (bodies, headers, secrets) = options
1110 .map(|o| (o.bodies, o.headers, o.secrets))
1111 .unwrap_or_default();
1112
1113 let mut body_tuples = vec![];
1115 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1116 let scalar_type = use_bytes
1117 .then_some(SqlScalarType::Bytes)
1118 .unwrap_or(SqlScalarType::String);
1119 let name = alias
1120 .map(|a| a.into_string())
1121 .unwrap_or_else(|| "body".to_string());
1122
1123 column_typs.push(SqlColumnType {
1124 scalar_type,
1125 nullable: false,
1126 });
1127 column_names.push(name);
1128
1129 let column_idx = column_typs.len() - 1;
1131 assert_eq!(
1133 column_idx,
1134 column_names.len() - 1,
1135 "body column names and types don't match"
1136 );
1137 body_tuples.push((column_idx, use_bytes));
1138 }
1139
1140 let mut header_tuples = vec![];
1142
1143 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1144 let value_type = use_bytes
1145 .then_some(SqlScalarType::Bytes)
1146 .unwrap_or(SqlScalarType::String);
1147 let name = alias
1148 .map(|a| a.into_string())
1149 .unwrap_or_else(|| "headers".to_string());
1150
1151 column_typs.push(SqlColumnType {
1152 scalar_type: SqlScalarType::Map {
1153 value_type: Box::new(value_type),
1154 custom_id: None,
1155 },
1156 nullable: false,
1157 });
1158 column_names.push(name);
1159
1160 let column_idx = column_typs.len() - 1;
1162 assert_eq!(
1164 column_idx,
1165 column_names.len() - 1,
1166 "header column names and types don't match"
1167 );
1168 header_tuples.push((column_idx, use_bytes));
1169 }
1170
1171 let mut validation_secrets = vec![];
1173
1174 for CreateWebhookSourceSecret {
1175 secret,
1176 alias,
1177 use_bytes,
1178 } in secrets
1179 {
1180 let scalar_type = use_bytes
1182 .then_some(SqlScalarType::Bytes)
1183 .unwrap_or(SqlScalarType::String);
1184
1185 column_typs.push(SqlColumnType {
1186 scalar_type,
1187 nullable: false,
1188 });
1189 let ResolvedItemName::Item {
1190 id,
1191 full_name: FullItemName { item, .. },
1192 ..
1193 } = secret
1194 else {
1195 return Err(PlanError::InvalidSecret(Box::new(secret)));
1196 };
1197
1198 let name = if let Some(alias) = alias {
1200 alias.into_string()
1201 } else {
1202 item
1203 };
1204 column_names.push(name);
1205
1206 let column_idx = column_typs.len() - 1;
1209 assert_eq!(
1211 column_idx,
1212 column_names.len() - 1,
1213 "column names and types don't match"
1214 );
1215
1216 validation_secrets.push(WebhookValidationSecret {
1217 id,
1218 column_idx,
1219 use_bytes,
1220 });
1221 }
1222
1223 let relation_typ = SqlRelationType::new(column_typs);
1224 let desc = RelationDesc::new(relation_typ, column_names.clone());
1225 let scope = Scope::from_source(None, column_names);
1226
1227 transform_ast::transform(scx, &mut expr)?;
1228
1229 let ecx = &ExprContext {
1230 qcx: &qcx,
1231 name: "CHECK",
1232 scope: &scope,
1233 relation_type: desc.typ(),
1234 allow_aggregates: false,
1235 allow_subqueries: false,
1236 allow_parameters: false,
1237 allow_windows: false,
1238 };
1239 let expr = plan_expr(ecx, &expr)?
1240 .type_as(ecx, &SqlScalarType::Bool)?
1241 .lower_uncorrelated(scx.catalog.system_vars())?;
1242 let validation = WebhookValidation {
1243 expression: expr,
1244 relation_desc: desc,
1245 bodies: body_tuples,
1246 headers: header_tuples,
1247 secrets: validation_secrets,
1248 };
1249 Ok(validation)
1250}
1251
1252pub fn plan_default_expr(
1253 scx: &StatementContext,
1254 expr: &Expr<Aug>,
1255 target_ty: &SqlScalarType,
1256) -> Result<HirScalarExpr, PlanError> {
1257 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1258 let ecx = &ExprContext {
1259 qcx: &qcx,
1260 name: "DEFAULT expression",
1261 scope: &Scope::empty(),
1262 relation_type: &SqlRelationType::empty(),
1263 allow_aggregates: false,
1264 allow_subqueries: false,
1265 allow_parameters: false,
1266 allow_windows: false,
1267 };
1268 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1269 Ok(hir)
1270}
1271
1272pub fn plan_params<'a>(
1273 scx: &'a StatementContext,
1274 params: Vec<Expr<Aug>>,
1275 desc: &StatementDesc,
1276) -> Result<Params, PlanError> {
1277 if params.len() != desc.param_types.len() {
1278 sql_bail!(
1279 "expected {} params, got {}",
1280 desc.param_types.len(),
1281 params.len()
1282 );
1283 }
1284
1285 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1286
1287 let mut datums = Row::default();
1288 let mut packer = datums.packer();
1289 let mut actual_types = Vec::new();
1290 let temp_storage = &RowArena::new();
1291 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1292 transform_ast::transform(scx, &mut expr)?;
1293
1294 let ecx = execute_expr_context(&qcx);
1295 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1296 let actual_ty = ecx.scalar_type(&ex);
1297 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1298 return Err(PlanError::WrongParameterType(
1299 i + 1,
1300 ecx.humanize_sql_scalar_type(expected_ty, false),
1301 ecx.humanize_sql_scalar_type(&actual_ty, false),
1302 ));
1303 }
1304 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1305 let evaled = ex.eval(&[], temp_storage)?;
1306 packer.push(evaled);
1307 actual_types.push(actual_ty);
1308 }
1309 Ok(Params {
1310 datums,
1311 execute_types: actual_types,
1312 expected_types: desc.param_types.clone(),
1313 })
1314}
1315
1316static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1317static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1318
1319pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1321 ExprContext {
1322 qcx,
1323 name: "EXECUTE",
1324 scope: &EXECUTE_CONTEXT_SCOPE,
1325 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1326 allow_aggregates: false,
1327 allow_subqueries: false,
1328 allow_parameters: false,
1329 allow_windows: false,
1330 }
1331}
1332
1333pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1338 LazyLock::new(|| CastContext::Assignment);
1339
1340pub fn plan_index_exprs<'a>(
1341 scx: &'a StatementContext,
1342 on_desc: &RelationDesc,
1343 exprs: Vec<Expr<Aug>>,
1344) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1345 let scope = Scope::from_source(None, on_desc.iter_names());
1346 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1347
1348 let ecx = &ExprContext {
1349 qcx: &qcx,
1350 name: "CREATE INDEX",
1351 scope: &scope,
1352 relation_type: on_desc.typ(),
1353 allow_aggregates: false,
1354 allow_subqueries: false,
1355 allow_parameters: false,
1356 allow_windows: false,
1357 };
1358 let repr_col_types: Vec<ReprColumnType> = on_desc
1359 .typ()
1360 .column_types
1361 .iter()
1362 .map(ReprColumnType::from)
1363 .collect();
1364 let mut out = vec![];
1365 for mut expr in exprs {
1366 transform_ast::transform(scx, &mut expr)?;
1367 let expr = plan_expr_or_col_index(ecx, &expr)?;
1368 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1369 expr.reduce(&repr_col_types);
1370 out.push(expr);
1371 }
1372 Ok(out)
1373}
1374
1375fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1376 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1377 Some(column) => Ok(HirScalarExpr::column(column)),
1378 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1379 }
1380}
1381
1382fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1383 match e {
1384 Expr::Value(Value::Number(n)) => {
1385 let n = n.parse::<usize>().map_err(|e| {
1386 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1387 })?;
1388 if n < 1 || n > max {
1389 sql_bail!(
1390 "column reference {} in {} is out of range (1 - {})",
1391 n,
1392 name,
1393 max
1394 );
1395 }
1396 Ok(Some(n - 1))
1397 }
1398 _ => Ok(None),
1399 }
1400}
1401
1402struct PlannedQuery {
1403 expr: HirRelationExpr,
1404 scope: Scope,
1405 order_by: Vec<ColumnOrder>,
1406 limit: Option<HirScalarExpr>,
1407 offset: HirScalarExpr,
1413 project: Vec<usize>,
1414 group_size_hints: GroupSizeHints,
1415}
1416
1417fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1418 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1419}
1420
1421fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1422 let cte_bindings = plan_ctes(qcx, q)?;
1425
1426 let limit = match &q.limit {
1427 None => None,
1428 Some(Limit {
1429 quantity,
1430 with_ties: false,
1431 }) => {
1432 let ecx = &ExprContext {
1433 qcx,
1434 name: "LIMIT",
1435 scope: &Scope::empty(),
1436 relation_type: &SqlRelationType::empty(),
1437 allow_aggregates: false,
1438 allow_subqueries: true,
1439 allow_parameters: true,
1440 allow_windows: false,
1441 };
1442 let limit = plan_expr(ecx, quantity)?;
1443 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1444
1445 let limit = if limit.is_constant() {
1446 let arena = RowArena::new();
1447 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1448
1449 match limit.eval(&[], &arena)? {
1453 d @ Datum::Int64(v) if v >= 0 => {
1454 HirScalarExpr::literal(d, SqlScalarType::Int64)
1455 }
1456 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1457 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1458 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1459 }
1460 } else {
1461 qcx.scx
1463 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1464 limit
1465 };
1466
1467 Some(limit)
1468 }
1469 Some(Limit {
1470 quantity: _,
1471 with_ties: true,
1472 }) => bail_unsupported!("FETCH ... WITH TIES"),
1473 };
1474
1475 let offset = match &q.offset {
1476 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1477 Some(offset) => {
1478 let ecx = &ExprContext {
1479 qcx,
1480 name: "OFFSET",
1481 scope: &Scope::empty(),
1482 relation_type: &SqlRelationType::empty(),
1483 allow_aggregates: false,
1484 allow_subqueries: false,
1485 allow_parameters: true,
1486 allow_windows: false,
1487 };
1488 let offset = plan_expr(ecx, offset)?;
1489 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491 let offset = if offset.is_constant() {
1492 let offset_value = offset_into_value(offset)?;
1494 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1495 } else {
1496 if !offset.contains_parameters() {
1500 return Err(PlanError::InvalidOffset(format!(
1501 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1502 offset
1503 )));
1504 }
1505 offset
1506 };
1507 offset
1508 }
1509 };
1510
1511 let mut planned_query = match &q.body {
1512 SetExpr::Select(s) => {
1513 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1515 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1516
1517 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1518 PlannedQuery {
1519 expr: plan.expr,
1520 scope: plan.scope,
1521 order_by: plan.order_by,
1522 project: plan.project,
1523 limit,
1524 offset,
1525 group_size_hints,
1526 }
1527 }
1528 _ => {
1529 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1530 let ecx = &ExprContext {
1531 qcx,
1532 name: "ORDER BY clause of a set expression",
1533 scope: &scope,
1534 relation_type: &qcx.relation_type(&expr),
1535 allow_aggregates: false,
1536 allow_subqueries: true,
1537 allow_parameters: true,
1538 allow_windows: false,
1539 };
1540 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1541 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1542 let project = (0..ecx.relation_type.arity()).collect();
1543 PlannedQuery {
1544 expr: expr.map(map_exprs),
1545 scope,
1546 order_by,
1547 limit,
1548 project,
1549 offset,
1550 group_size_hints: GroupSizeHints::default(),
1551 }
1552 }
1553 };
1554
1555 match &q.ctes {
1557 CteBlock::Simple(_) => {
1558 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1559 if let Some(cte) = qcx.ctes.remove(&id) {
1560 planned_query.expr = HirRelationExpr::Let {
1561 name: cte.name,
1562 id: id.clone(),
1563 value: Box::new(value),
1564 body: Box::new(planned_query.expr),
1565 };
1566 }
1567 if let Some(shadowed_val) = shadowed_val {
1568 qcx.ctes.insert(id, shadowed_val);
1569 }
1570 }
1571 }
1572 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1573 let MutRecBlockOptionExtracted {
1574 recursion_limit,
1575 return_at_recursion_limit,
1576 error_at_recursion_limit,
1577 seen: _,
1578 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1579 let limit = match (
1580 recursion_limit,
1581 return_at_recursion_limit,
1582 error_at_recursion_limit,
1583 ) {
1584 (None, None, None) => None,
1585 (Some(max_iters), None, None) => {
1586 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1587 }
1588 (None, Some(max_iters), None) => Some((max_iters, true)),
1589 (None, None, Some(max_iters)) => Some((max_iters, false)),
1590 _ => {
1591 return Err(InvalidWmrRecursionLimit(
1592 "More than one recursion limit given. \
1593 Please give at most one of RECURSION LIMIT, \
1594 ERROR AT RECURSION LIMIT, \
1595 RETURN AT RECURSION LIMIT."
1596 .to_owned(),
1597 ));
1598 }
1599 }
1600 .try_map(|(max_iters, return_at_limit)| {
1601 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1602 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1603 "Recursion limit has to be greater than 0.".to_owned(),
1604 ))?,
1605 return_at_limit: *return_at_limit,
1606 })
1607 })?;
1608
1609 let mut bindings = Vec::new();
1610 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1611 if let Some(cte) = qcx.ctes.remove(&id) {
1612 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1613 }
1614 if let Some(shadowed_val) = shadowed_val {
1615 qcx.ctes.insert(id, shadowed_val);
1616 }
1617 }
1618 if !bindings.is_empty() {
1619 planned_query.expr = HirRelationExpr::LetRec {
1620 limit,
1621 bindings,
1622 body: Box::new(planned_query.expr),
1623 }
1624 }
1625 }
1626 }
1627
1628 Ok(planned_query)
1629}
1630
1631pub(crate) fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1633 let offset = offset
1634 .try_into_literal_int64()
1635 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1636 if offset < 0 {
1637 return Err(negative_offset_error(offset));
1638 }
1639 Ok(offset)
1640}
1641
1642pub(crate) fn negative_offset_error(offset: i64) -> PlanError {
1643 PlanError::InvalidOffset(format!("must not be negative, got {}", offset))
1644}
1645
1646generate_extracted_config!(
1647 MutRecBlockOption,
1648 (RecursionLimit, u64),
1649 (ReturnAtRecursionLimit, u64),
1650 (ErrorAtRecursionLimit, u64)
1651);
1652
1653pub fn plan_ctes(
1658 qcx: &mut QueryContext,
1659 q: &Query<Aug>,
1660) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1661 let mut result = Vec::new();
1663 let mut shadowed_descs = BTreeMap::new();
1666
1667 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1669 sql_bail!(
1670 "WITH query name {} specified more than once",
1671 normalize::ident_ref(ident).quoted()
1672 )
1673 }
1674
1675 match &q.ctes {
1676 CteBlock::Simple(ctes) => {
1677 for cte in ctes.iter() {
1679 let cte_name = normalize::ident(cte.alias.name.clone());
1680 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1681 let typ = qcx.relation_type(&val);
1682 let mut desc = RelationDesc::new(typ, scope.column_names());
1683 plan_utils::maybe_rename_columns(
1684 format!("CTE {}", cte.alias.name),
1685 &mut desc,
1686 &cte.alias.columns,
1687 )?;
1688 let shadowed = qcx.ctes.insert(
1690 cte.id,
1691 CteDesc {
1692 name: cte_name,
1693 desc,
1694 },
1695 );
1696
1697 result.push((cte.id, val, shadowed));
1698 }
1699 }
1700 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1701 for cte in ctes.iter() {
1703 let cte_name = normalize::ident(cte.name.clone());
1704 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1705 for column in cte.columns.iter() {
1706 desc_columns.push((
1707 normalize::column_name(column.name.clone()),
1708 SqlColumnType {
1709 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1710 nullable: true,
1711 },
1712 ));
1713 }
1714 let desc = RelationDesc::from_names_and_types(desc_columns);
1715 let shadowed = qcx.ctes.insert(
1716 cte.id,
1717 CteDesc {
1718 name: cte_name,
1719 desc,
1720 },
1721 );
1722 if let Some(shadowed) = shadowed {
1724 shadowed_descs.insert(cte.id, shadowed);
1725 }
1726 }
1727
1728 for cte in ctes.iter() {
1730 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1731
1732 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1733
1734 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1735 sql_bail!(
1738 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1739 );
1740 }
1741
1742 if !proposed_typ.keys.is_empty() {
1743 sql_bail!("[internal error]: WMR CTEs do not support keys");
1746 }
1747
1748 let derived_typ = qcx.relation_type(&val);
1750
1751 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1752 let cte_name = normalize::ident(cte.name.clone());
1753 let proposed_typ = proposed_typ
1754 .column_types
1755 .iter()
1756 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1757 .collect::<Vec<_>>();
1758 let inferred_typ = derived_typ
1759 .column_types
1760 .iter()
1761 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1762 .collect::<Vec<_>>();
1763 Err(PlanError::RecursiveTypeMismatch(
1764 cte_name,
1765 proposed_typ,
1766 inferred_typ,
1767 ))
1768 };
1769
1770 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1771 return type_err(proposed_typ, derived_typ);
1772 }
1773
1774 let val = match cast_relation(
1776 qcx,
1777 CastContext::Assignment,
1782 val,
1783 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1784 ) {
1785 Ok(val) => val,
1786 Err(_) => return type_err(proposed_typ, derived_typ),
1787 };
1788
1789 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1790 }
1791 }
1792 }
1793
1794 Ok(result)
1795}
1796
1797pub fn plan_nested_query(
1798 qcx: &mut QueryContext,
1799 q: &Query<Aug>,
1800) -> Result<(HirRelationExpr, Scope), PlanError> {
1801 let PlannedQuery {
1802 mut expr,
1803 scope,
1804 order_by,
1805 limit,
1806 offset,
1807 project,
1808 group_size_hints,
1809 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1810 if limit.is_some()
1811 || !offset
1812 .clone()
1813 .try_into_literal_int64()
1814 .is_ok_and(|offset| offset == 0)
1815 {
1816 expr = HirRelationExpr::top_k(
1817 expr,
1818 vec![],
1819 order_by,
1820 limit,
1821 offset,
1822 group_size_hints.limit_input_group_size,
1823 );
1824 }
1825 Ok((expr.project(project), scope))
1826}
1827
1828fn plan_set_expr(
1829 qcx: &mut QueryContext,
1830 q: &SetExpr<Aug>,
1831) -> Result<(HirRelationExpr, Scope), PlanError> {
1832 match q {
1833 SetExpr::Select(select) => {
1834 let order_by_exprs = Vec::new();
1835 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1836 assert!(plan.order_by.is_empty());
1839 Ok((plan.expr.project(plan.project), plan.scope))
1840 }
1841 SetExpr::SetOperation {
1842 op,
1843 all,
1844 left,
1845 right,
1846 } => {
1847 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1849 let (right_expr, right_scope) =
1850 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1851
1852 let left_type = qcx.relation_type(&left_expr);
1854 let right_type = qcx.relation_type(&right_expr);
1855 if left_type.arity() != right_type.arity() {
1856 sql_bail!(
1857 "each {} query must have the same number of columns: {} vs {}",
1858 op,
1859 left_type.arity(),
1860 right_type.arity(),
1861 );
1862 }
1863
1864 let left_ecx = &ExprContext {
1869 qcx,
1870 name: &op.to_string(),
1871 scope: &left_scope,
1872 relation_type: &left_type,
1873 allow_aggregates: false,
1874 allow_subqueries: false,
1875 allow_parameters: false,
1876 allow_windows: false,
1877 };
1878 let right_ecx = &ExprContext {
1879 qcx,
1880 name: &op.to_string(),
1881 scope: &right_scope,
1882 relation_type: &right_type,
1883 allow_aggregates: false,
1884 allow_subqueries: false,
1885 allow_parameters: false,
1886 allow_windows: false,
1887 };
1888 let mut left_casts = vec![];
1889 let mut right_casts = vec![];
1890 for (i, (left_type, right_type)) in left_type
1891 .column_types
1892 .iter()
1893 .zip_eq(right_type.column_types.iter())
1894 .enumerate()
1895 {
1896 let types = &[
1897 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1898 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1899 ];
1900 let target =
1901 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1902 match typeconv::plan_cast(
1903 left_ecx,
1904 CastContext::Implicit,
1905 HirScalarExpr::column(i),
1906 &target,
1907 ) {
1908 Ok(expr) => left_casts.push(expr),
1909 Err(_) => sql_bail!(
1910 "{} types {} and {} cannot be matched",
1911 op,
1912 qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1913 qcx.humanize_sql_scalar_type(&target, false),
1914 ),
1915 }
1916 match typeconv::plan_cast(
1917 right_ecx,
1918 CastContext::Implicit,
1919 HirScalarExpr::column(i),
1920 &target,
1921 ) {
1922 Ok(expr) => right_casts.push(expr),
1923 Err(_) => sql_bail!(
1924 "{} types {} and {} cannot be matched",
1925 op,
1926 qcx.humanize_sql_scalar_type(&target, false),
1927 qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1928 ),
1929 }
1930 }
1931 let lhs = if left_casts
1932 .iter()
1933 .enumerate()
1934 .any(|(i, e)| e != &HirScalarExpr::column(i))
1935 {
1936 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1937 left_expr.map(left_casts).project(project_key)
1938 } else {
1939 left_expr
1940 };
1941 let rhs = if right_casts
1942 .iter()
1943 .enumerate()
1944 .any(|(i, e)| e != &HirScalarExpr::column(i))
1945 {
1946 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1947 right_expr.map(right_casts).project(project_key)
1948 } else {
1949 right_expr
1950 };
1951
1952 let relation_expr = match op {
1953 SetOperator::Union => {
1954 if *all {
1955 lhs.union(rhs)
1956 } else {
1957 lhs.union(rhs).distinct()
1958 }
1959 }
1960 SetOperator::Except => Hir::except(all, lhs, rhs),
1961 SetOperator::Intersect => {
1962 let left_clone = lhs.clone();
1968 if *all {
1969 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1970 } else {
1971 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1972 .distinct()
1973 }
1974 }
1975 };
1976 let scope = Scope::from_source(
1977 None,
1978 left_scope.column_names(),
1980 );
1981
1982 Ok((relation_expr, scope))
1983 }
1984 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1985 SetExpr::Table(name) => {
1986 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1987 Ok((expr, scope))
1988 }
1989 SetExpr::Query(query) => {
1990 let (expr, scope) = plan_nested_query(qcx, query)?;
1991 Ok((expr, scope))
1992 }
1993 SetExpr::Show(stmt) => {
1994 if !qcx.lifetime.allow_show() {
2008 return Err(PlanError::ShowCommandInView);
2009 }
2010
2011 fn to_hirscope(
2014 plan: ShowCreatePlan,
2015 desc: StatementDesc,
2016 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2017 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2018 let desc = desc.relation_desc.ok_or_else(|| {
2019 internal_err!("statement description missing relation descriptor")
2020 })?;
2021 let scope = Scope::from_source(None, desc.iter_names());
2022 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2023 Ok((expr, scope))
2024 }
2025
2026 match stmt.clone() {
2027 ShowStatement::ShowColumns(stmt) => {
2028 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2029 }
2030 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2031 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2032 show::describe_show_create_connection(qcx.scx, stmt)?,
2033 ),
2034 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2035 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2036 show::describe_show_create_cluster(qcx.scx, stmt)?,
2037 ),
2038 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2039 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2040 show::describe_show_create_index(qcx.scx, stmt)?,
2041 ),
2042 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2043 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2044 show::describe_show_create_sink(qcx.scx, stmt)?,
2045 ),
2046 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2047 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2048 show::describe_show_create_source(qcx.scx, stmt)?,
2049 ),
2050 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2051 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2052 show::describe_show_create_table(qcx.scx, stmt)?,
2053 ),
2054 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2055 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2056 show::describe_show_create_view(qcx.scx, stmt)?,
2057 ),
2058 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2059 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2060 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2061 ),
2062 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2063 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2064 show::describe_show_create_type(qcx.scx, stmt)?,
2065 ),
2066 ShowStatement::ShowObjects(stmt) => {
2067 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2068 }
2069 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2070 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2071 }
2072 }
2073 }
2074}
2075
2076fn plan_values(
2078 qcx: &QueryContext,
2079 values: &[Vec<Expr<Aug>>],
2080) -> Result<(HirRelationExpr, Scope), PlanError> {
2081 assert!(!values.is_empty());
2082
2083 let ecx = &ExprContext {
2084 qcx,
2085 name: "VALUES",
2086 scope: &Scope::empty(),
2087 relation_type: &SqlRelationType::empty(),
2088 allow_aggregates: false,
2089 allow_subqueries: true,
2090 allow_parameters: true,
2091 allow_windows: false,
2092 };
2093
2094 let ncols = values[0].len();
2095 let nrows = values.len();
2096
2097 let mut cols = vec![vec![]; ncols];
2100 for row in values {
2101 if row.len() != ncols {
2102 sql_bail!(
2103 "VALUES expression has varying number of columns: {} vs {}",
2104 row.len(),
2105 ncols
2106 );
2107 }
2108 for (i, v) in row.iter().enumerate() {
2109 cols[i].push(v);
2110 }
2111 }
2112
2113 let mut col_iters = Vec::with_capacity(ncols);
2115 let mut col_types = Vec::with_capacity(ncols);
2116 for col in &cols {
2117 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2118 let mut col_type = ecx.column_type(&col[0]);
2119 for val in &col[1..] {
2120 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2122 col_types.push(col_type);
2123 col_iters.push(col.into_iter());
2124 }
2125
2126 let mut exprs = vec![];
2128 for _ in 0..nrows {
2129 for i in 0..ncols {
2130 exprs.push(col_iters[i].next().unwrap());
2131 }
2132 }
2133 let out = HirRelationExpr::CallTable {
2134 func: TableFunc::Wrap {
2135 width: ncols,
2136 types: col_types,
2137 },
2138 exprs,
2139 };
2140
2141 let mut scope = Scope::empty();
2143 for i in 0..ncols {
2144 let name = format!("column{}", i + 1);
2145 scope.items.push(ScopeItem::from_column_name(name));
2146 }
2147
2148 Ok((out, scope))
2149}
2150
2151fn plan_values_insert(
2161 qcx: &QueryContext,
2162 target_names: &[&ColumnName],
2163 target_types: &[&SqlScalarType],
2164 values: &[Vec<Expr<Aug>>],
2165) -> Result<HirRelationExpr, PlanError> {
2166 assert!(!values.is_empty());
2167
2168 if !values.iter().map(|row| row.len()).all_equal() {
2169 sql_bail!("VALUES lists must all be the same length");
2170 }
2171
2172 let ecx = &ExprContext {
2173 qcx,
2174 name: "VALUES",
2175 scope: &Scope::empty(),
2176 relation_type: &SqlRelationType::empty(),
2177 allow_aggregates: false,
2178 allow_subqueries: true,
2179 allow_parameters: true,
2180 allow_windows: false,
2181 };
2182
2183 let mut exprs = vec![];
2184 let mut types = vec![];
2185 for row in values {
2186 if row.len() > target_names.len() {
2187 sql_bail!("INSERT has more expressions than target columns");
2188 }
2189 for (column, val) in row.into_iter().enumerate() {
2190 let target_type = &target_types[column];
2191 let val = plan_expr(ecx, val)?;
2192 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2193 let source_type = &ecx.scalar_type(&val);
2194 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2195 Ok(val) => val,
2196 Err(_) => sql_bail!(
2197 "column {} is of type {} but expression is of type {}",
2198 target_names[column].quoted(),
2199 qcx.humanize_sql_scalar_type(target_type, false),
2200 qcx.humanize_sql_scalar_type(source_type, false),
2201 ),
2202 };
2203 if column >= types.len() {
2204 types.push(ecx.column_type(&val));
2205 } else {
2206 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2208 exprs.push(val);
2209 }
2210 }
2211
2212 Ok(HirRelationExpr::CallTable {
2213 func: TableFunc::Wrap {
2214 width: values[0].len(),
2215 types,
2216 },
2217 exprs,
2218 })
2219}
2220
2221fn plan_join_identity() -> (HirRelationExpr, Scope) {
2222 let typ = SqlRelationType::new(vec![]);
2223 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2224 let scope = Scope::empty();
2225 (expr, scope)
2226}
2227
2228#[derive(Debug)]
2234struct SelectPlan {
2235 expr: HirRelationExpr,
2236 scope: Scope,
2237 order_by: Vec<ColumnOrder>,
2238 project: Vec<usize>,
2239}
2240
2241generate_extracted_config!(
2242 SelectOption,
2243 (ExpectedGroupSize, u64),
2244 (AggregateInputGroupSize, u64),
2245 (DistinctOnInputGroupSize, u64),
2246 (LimitInputGroupSize, u64)
2247);
2248
2249fn plan_select_from_where(
2267 qcx: &QueryContext,
2268 mut s: Select<Aug>,
2269 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2270) -> Result<SelectPlan, PlanError> {
2271 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2278 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2279
2280 let (mut relation_expr, mut from_scope) =
2282 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2283 let (left, left_scope) = l;
2284 plan_join(
2285 qcx,
2286 left,
2287 left_scope,
2288 &Join {
2289 relation: TableFactor::NestedJoin {
2290 join: Box::new(twj.clone()),
2291 alias: None,
2292 },
2293 join_operator: JoinOperator::CrossJoin,
2294 },
2295 )
2296 })?;
2297
2298 if let Some(selection) = &s.selection {
2300 let ecx = &ExprContext {
2301 qcx,
2302 name: "WHERE clause",
2303 scope: &from_scope,
2304 relation_type: &qcx.relation_type(&relation_expr),
2305 allow_aggregates: false,
2306 allow_subqueries: true,
2307 allow_parameters: true,
2308 allow_windows: false,
2309 };
2310 let expr = plan_expr(ecx, selection)
2311 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2312 .type_as(ecx, &SqlScalarType::Bool)?;
2313 relation_expr = relation_expr.filter(vec![expr]);
2314 }
2315
2316 let (aggregates, table_funcs) = {
2319 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2320 visitor.visit_select_mut(&mut s);
2321 for o in order_by_exprs.iter_mut() {
2322 visitor.visit_order_by_expr_mut(o);
2323 }
2324 visitor.into_result()?
2325 };
2326 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2327 if !table_funcs.is_empty() {
2328 let (expr, scope) = plan_scalar_table_funcs(
2329 qcx,
2330 table_funcs,
2331 &mut table_func_names,
2332 &relation_expr,
2333 &from_scope,
2334 )?;
2335 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2336 from_scope = from_scope.product(scope)?;
2337 }
2338
2339 let projection = {
2341 let ecx = &ExprContext {
2342 qcx,
2343 name: "SELECT clause",
2344 scope: &from_scope,
2345 relation_type: &qcx.relation_type(&relation_expr),
2346 allow_aggregates: true,
2347 allow_subqueries: true,
2348 allow_parameters: true,
2349 allow_windows: true,
2350 };
2351 let mut out = vec![];
2352 for si in &s.projection {
2353 if *si == SelectItem::Wildcard && s.from.is_empty() {
2354 sql_bail!("SELECT * with no tables specified is not valid");
2355 }
2356 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2357 }
2358 out
2359 };
2360
2361 let (mut group_scope, select_all_mapping) = {
2365 let ecx = &ExprContext {
2367 qcx,
2368 name: "GROUP BY clause",
2369 scope: &from_scope,
2370 relation_type: &qcx.relation_type(&relation_expr),
2371 allow_aggregates: false,
2372 allow_subqueries: true,
2373 allow_parameters: true,
2374 allow_windows: false,
2375 };
2376 let mut group_key = vec![];
2377 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2378 let mut group_hir_exprs = vec![];
2379 let mut group_scope = Scope::empty();
2380 let mut select_all_mapping = BTreeMap::new();
2381
2382 for group_expr in &s.group_by {
2383 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2384 let new_column = group_key.len();
2385
2386 if let Some(group_expr) = group_expr {
2387 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2391 existing_scope_item.exprs.insert(group_expr.clone());
2392 continue;
2393 }
2394 }
2395
2396 let mut scope_item = if let HirScalarExpr::Column(
2397 ColumnRef {
2398 level: 0,
2399 column: old_column,
2400 },
2401 _name,
2402 ) = &expr
2403 {
2404 select_all_mapping.insert(*old_column, new_column);
2410 let scope_item = ecx.scope.items[*old_column].clone();
2411 scope_item
2412 } else {
2413 ScopeItem::empty()
2414 };
2415
2416 if let Some(group_expr) = group_expr.cloned() {
2417 scope_item.exprs.insert(group_expr);
2418 }
2419
2420 group_key.push(from_scope.len() + group_exprs.len());
2421 group_hir_exprs.push(expr.clone());
2422 group_exprs.insert(expr, scope_item);
2423 }
2424
2425 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2426 for expr in &group_hir_exprs {
2427 if let Some(scope_item) = group_exprs.remove(expr) {
2428 group_scope.items.push(scope_item);
2429 }
2430 }
2431
2432 let ecx = &ExprContext {
2434 qcx,
2435 name: "aggregate function",
2436 scope: &from_scope,
2437 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2438 allow_aggregates: false,
2439 allow_subqueries: true,
2440 allow_parameters: true,
2441 allow_windows: false,
2442 };
2443 let mut agg_exprs = vec![];
2444 for sql_function in aggregates {
2445 if sql_function.over.is_some() {
2446 unreachable!(
2447 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2448 );
2449 }
2450 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2451 group_scope
2452 .items
2453 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2454 }
2455 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2456 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2458 group_key,
2459 agg_exprs,
2460 group_size_hints.aggregate_input_group_size,
2461 );
2462
2463 for i in 0..from_scope.len() {
2469 if !select_all_mapping.contains_key(&i) {
2470 let scope_item = &ecx.scope.items[i];
2471 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2472 table_name: scope_item.table_name.clone(),
2473 column_name: scope_item.column_name.clone(),
2474 allow_unqualified_references: scope_item.allow_unqualified_references,
2475 });
2476 }
2477 }
2478
2479 (group_scope, select_all_mapping)
2480 } else {
2481 (
2483 from_scope.clone(),
2484 (0..from_scope.len()).map(|i| (i, i)).collect(),
2485 )
2486 }
2487 };
2488
2489 if let Some(ref having) = s.having {
2491 let ecx = &ExprContext {
2492 qcx,
2493 name: "HAVING clause",
2494 scope: &group_scope,
2495 relation_type: &qcx.relation_type(&relation_expr),
2496 allow_aggregates: true,
2497 allow_subqueries: true,
2498 allow_parameters: true,
2499 allow_windows: false,
2500 };
2501 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2502 relation_expr = relation_expr.filter(vec![expr]);
2503 }
2504
2505 let window_funcs = {
2518 let mut visitor = WindowFuncCollector::default();
2519 visitor.visit_select(&s);
2523 for o in order_by_exprs.iter() {
2524 visitor.visit_order_by_expr(o);
2525 }
2526 visitor.into_result()
2527 };
2528 for window_func in window_funcs {
2529 let ecx = &ExprContext {
2530 qcx,
2531 name: "window function",
2532 scope: &group_scope,
2533 relation_type: &qcx.relation_type(&relation_expr),
2534 allow_aggregates: true,
2535 allow_subqueries: true,
2536 allow_parameters: true,
2537 allow_windows: true,
2538 };
2539 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2540 group_scope.items.push(ScopeItem::from_expr(window_func));
2541 }
2542 if let Some(ref qualify) = s.qualify {
2549 let ecx = &ExprContext {
2550 qcx,
2551 name: "QUALIFY clause",
2552 scope: &group_scope,
2553 relation_type: &qcx.relation_type(&relation_expr),
2554 allow_aggregates: true,
2555 allow_subqueries: true,
2556 allow_parameters: true,
2557 allow_windows: true,
2558 };
2559 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2560 relation_expr = relation_expr.filter(vec![expr]);
2561 }
2562
2563 let output_columns = {
2565 let mut new_exprs = vec![];
2566 let mut new_type = qcx.relation_type(&relation_expr);
2567 let mut output_columns = vec![];
2568 for (select_item, column_name) in &projection {
2569 let ecx = &ExprContext {
2570 qcx,
2571 name: "SELECT clause",
2572 scope: &group_scope,
2573 relation_type: &new_type,
2574 allow_aggregates: true,
2575 allow_subqueries: true,
2576 allow_parameters: true,
2577 allow_windows: true,
2578 };
2579 let expr = match select_item {
2580 ExpandedSelectItem::InputOrdinal(i) => {
2581 if let Some(column) = select_all_mapping.get(i).copied() {
2582 HirScalarExpr::column(column)
2583 } else {
2584 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2585 }
2586 }
2587 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2588 };
2589 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2590 output_columns.push((column, column_name));
2592 } else {
2593 let typ = ecx.column_type(&expr);
2600 new_type.column_types.push(typ);
2601 new_exprs.push(expr);
2602 output_columns.push((group_scope.len(), column_name));
2603 group_scope
2604 .items
2605 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2606 }
2607 }
2608 relation_expr = relation_expr.map(new_exprs);
2609 output_columns
2610 };
2611 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2612
2613 let order_by = {
2615 let relation_type = qcx.relation_type(&relation_expr);
2616 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2617 &ExprContext {
2618 qcx,
2619 name: "ORDER BY clause",
2620 scope: &group_scope,
2621 relation_type: &relation_type,
2622 allow_aggregates: true,
2623 allow_subqueries: true,
2624 allow_parameters: true,
2625 allow_windows: true,
2626 },
2627 &order_by_exprs,
2628 &output_columns,
2629 )?;
2630
2631 match s.distinct {
2632 None => relation_expr = relation_expr.map(map_exprs),
2633 Some(Distinct::EntireRow) => {
2634 if relation_type.arity() == 0 {
2635 sql_bail!("SELECT DISTINCT must have at least one column");
2636 }
2637 if !try_push_projection_order_by(
2641 &mut relation_expr,
2642 &mut project_key,
2643 &mut order_by,
2644 ) {
2645 sql_bail!(
2646 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2647 );
2648 }
2649 assert!(map_exprs.is_empty());
2650 relation_expr = relation_expr.distinct();
2651 }
2652 Some(Distinct::On(exprs)) => {
2653 let ecx = &ExprContext {
2654 qcx,
2655 name: "DISTINCT ON clause",
2656 scope: &group_scope,
2657 relation_type: &qcx.relation_type(&relation_expr),
2658 allow_aggregates: true,
2659 allow_subqueries: true,
2660 allow_parameters: true,
2661 allow_windows: true,
2662 };
2663
2664 let mut distinct_exprs = vec![];
2665 for expr in &exprs {
2666 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2667 distinct_exprs.push(expr);
2668 }
2669
2670 let mut distinct_key = vec![];
2671
2672 let arity = relation_type.arity();
2682 for ord in order_by.iter().take(distinct_exprs.len()) {
2683 let mut expr = &HirScalarExpr::column(ord.column);
2686 if ord.column >= arity {
2687 expr = &map_exprs[ord.column - arity];
2688 };
2689 match distinct_exprs.iter().position(move |e| e == expr) {
2690 None => sql_bail!(
2691 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2692 ),
2693 Some(pos) => {
2694 distinct_exprs.remove(pos);
2695 }
2696 }
2697 distinct_key.push(ord.column);
2698 }
2699
2700 for expr in distinct_exprs {
2702 let column = match expr {
2705 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2706 _ => {
2707 map_exprs.push(expr);
2708 arity + map_exprs.len() - 1
2709 }
2710 };
2711 distinct_key.push(column);
2712 }
2713
2714 let distinct_len = distinct_key.len();
2719 relation_expr = HirRelationExpr::top_k(
2720 relation_expr.map(map_exprs),
2721 distinct_key,
2722 order_by.iter().skip(distinct_len).cloned().collect(),
2723 Some(HirScalarExpr::literal(
2724 Datum::Int64(1),
2725 SqlScalarType::Int64,
2726 )),
2727 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2728 group_size_hints.distinct_on_input_group_size,
2729 );
2730 }
2731 }
2732
2733 order_by
2734 };
2735
2736 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2741
2742 Ok(SelectPlan {
2743 expr: relation_expr,
2744 scope,
2745 order_by,
2746 project: project_key,
2747 })
2748}
2749
2750fn plan_scalar_table_funcs(
2751 qcx: &QueryContext,
2752 table_funcs: BTreeMap<Function<Aug>, String>,
2753 table_func_names: &mut BTreeMap<String, Ident>,
2754 relation_expr: &HirRelationExpr,
2755 from_scope: &Scope,
2756) -> Result<(HirRelationExpr, Scope), PlanError> {
2757 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2758
2759 for (table_func, id) in table_funcs.iter() {
2760 table_func_names.insert(
2761 id.clone(),
2762 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2764 );
2765 }
2766 if table_funcs.len() == 1 {
2769 let (table_func, id) = table_funcs.iter().next().unwrap();
2770 let (expr, mut scope) =
2771 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2772
2773 let num_cols = scope.len();
2775 for i in 0..scope.len() {
2776 scope.items[i].table_name = Some(PartialItemName {
2777 database: None,
2778 schema: None,
2779 item: id.clone(),
2780 });
2781 scope.items[i].from_single_column_function = num_cols == 1;
2782 scope.items[i].allow_unqualified_references = false;
2783 }
2784 return Ok((expr, scope));
2785 }
2786 if table_funcs.keys().any(is_repeat_row) {
2787 bail_unsupported!(format!(
2790 "{} in a SELECT clause with multiple table functions",
2791 REPEAT_ROW_NAME
2792 ));
2793 }
2794 let (expr, mut scope, num_cols) =
2796 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2797
2798 let mut i = 0;
2800 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2801 for _ in 0..num_cols {
2802 scope.items[i].table_name = Some(PartialItemName {
2803 database: None,
2804 schema: None,
2805 item: id.clone(),
2806 });
2807 scope.items[i].from_single_column_function = num_cols == 1;
2808 scope.items[i].allow_unqualified_references = false;
2809 i += 1;
2810 }
2811 scope.items[i].table_name = Some(PartialItemName {
2815 database: None,
2816 schema: None,
2817 item: id.clone(),
2818 });
2819 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2820 scope.items[i].allow_unqualified_references = false;
2821 i += 1;
2822 }
2823 scope.items[i].allow_unqualified_references = false;
2825 Ok((expr, scope))
2826}
2827
2828fn plan_group_by_expr<'a>(
2835 ecx: &ExprContext,
2836 group_expr: &'a Expr<Aug>,
2837 projection: &'a [(ExpandedSelectItem, ColumnName)],
2838) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2839 let plan_projection = |column: usize| match &projection[column].0 {
2840 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2841 ExpandedSelectItem::Expr(expr) => {
2842 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2843 }
2844 };
2845
2846 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2849 return plan_projection(column);
2850 }
2851
2852 match group_expr {
2856 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2857 Err(PlanError::UnknownColumn {
2858 table: None,
2859 column,
2860 similar,
2861 }) => {
2862 let mut iter = projection.iter().map(|(_expr, name)| name);
2865 if let Some(i) = iter.position(|n| *n == column) {
2866 if iter.any(|n| *n == column) {
2867 Err(PlanError::AmbiguousColumn(column))
2868 } else {
2869 plan_projection(i)
2870 }
2871 } else {
2872 Err(PlanError::UnknownColumn {
2875 table: None,
2876 column,
2877 similar,
2878 })
2879 }
2880 }
2881 res => Ok((Some(group_expr), res?)),
2882 },
2883 _ => Ok((
2884 Some(group_expr),
2885 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2886 )),
2887 }
2888}
2889
2890pub(crate) fn plan_order_by_exprs(
2898 ecx: &ExprContext,
2899 order_by_exprs: &[OrderByExpr<Aug>],
2900 output_columns: &[(usize, &ColumnName)],
2901) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2902 let mut order_by = vec![];
2903 let mut map_exprs = vec![];
2904 for obe in order_by_exprs {
2905 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2906 let column = match expr {
2909 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2910 _ => {
2911 map_exprs.push(expr);
2912 ecx.relation_type.arity() + map_exprs.len() - 1
2913 }
2914 };
2915 order_by.push(resolve_desc_and_nulls_last(obe, column));
2916 }
2917 Ok((order_by, map_exprs))
2918}
2919
2920fn plan_order_by_or_distinct_expr(
2938 ecx: &ExprContext,
2939 expr: &Expr<Aug>,
2940 output_columns: &[(usize, &ColumnName)],
2941) -> Result<HirScalarExpr, PlanError> {
2942 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2943 return Ok(HirScalarExpr::column(output_columns[i].0));
2944 }
2945
2946 if let Expr::Identifier(names) = expr {
2947 if let [name] = &names[..] {
2948 let name = normalize::column_name(name.clone());
2949 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2950 if let Some((i, _)) = iter.next() {
2951 match iter.next() {
2952 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2956 _ => return Ok(HirScalarExpr::column(*i)),
2957 }
2958 }
2959 }
2960 }
2961
2962 plan_expr(ecx, expr)?.type_as_any(ecx)
2963}
2964
2965fn plan_table_with_joins(
2966 qcx: &QueryContext,
2967 table_with_joins: &TableWithJoins<Aug>,
2968) -> Result<(HirRelationExpr, Scope), PlanError> {
2969 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2970 for join in &table_with_joins.joins {
2971 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2972 expr = new_expr;
2973 scope = new_scope;
2974 }
2975 Ok((expr, scope))
2976}
2977
2978fn plan_table_factor(
2979 qcx: &QueryContext,
2980 table_factor: &TableFactor<Aug>,
2981) -> Result<(HirRelationExpr, Scope), PlanError> {
2982 match table_factor {
2983 TableFactor::Table { name, alias } => {
2984 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2985 let scope = plan_table_alias(scope, alias.as_ref())?;
2986 Ok((expr, scope))
2987 }
2988
2989 TableFactor::Function {
2990 function,
2991 alias,
2992 with_ordinality,
2993 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2994
2995 TableFactor::RowsFrom {
2996 functions,
2997 alias,
2998 with_ordinality,
2999 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3000
3001 TableFactor::Derived {
3002 lateral,
3003 subquery,
3004 alias,
3005 } => {
3006 let mut qcx = (*qcx).clone();
3007 if !lateral {
3008 for scope in &mut qcx.outer_scopes {
3012 if scope.lateral_barrier {
3013 break;
3014 }
3015 scope.items.clear();
3016 }
3017 }
3018 qcx.outer_scopes[0].lateral_barrier = true;
3019 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3020 let scope = plan_table_alias(scope, alias.as_ref())?;
3021 Ok((expr, scope))
3022 }
3023
3024 TableFactor::NestedJoin { join, alias } => {
3025 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3026 let scope = plan_table_alias(scope, alias.as_ref())?;
3027 Ok((expr, scope))
3028 }
3029 }
3030}
3031
3032fn plan_rows_from(
3074 qcx: &QueryContext,
3075 functions: &[Function<Aug>],
3076 alias: Option<&TableAlias>,
3077 with_ordinality: bool,
3078) -> Result<(HirRelationExpr, Scope), PlanError> {
3079 if functions.iter().any(is_repeat_row) {
3081 bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3085 }
3086
3087 if let [function] = functions {
3090 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3091 }
3092
3093 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3097 qcx,
3098 functions,
3099 Some(functions[0].name.full_item_name().clone()),
3100 )?;
3101
3102 let mut columns = Vec::new();
3104 let mut offset = 0;
3105 for (idx, cols) in num_cols.into_iter().enumerate() {
3107 for i in 0..cols {
3108 columns.push(offset + i);
3109 }
3110 offset += cols + 1;
3111
3112 scope.items.remove(offset - idx - 1);
3115 }
3116
3117 if with_ordinality {
3120 columns.push(offset);
3121 } else {
3122 scope.items.pop();
3123 }
3124
3125 let expr = expr.project(columns);
3126
3127 let scope = plan_table_alias(scope, alias)?;
3128 Ok((expr, scope))
3129}
3130
3131fn is_repeat_row(f: &Function<Aug>) -> bool {
3132 f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3133}
3134
3135fn plan_rows_from_internal<'a>(
3158 qcx: &QueryContext,
3159 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3160 table_name: Option<FullItemName>,
3161) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3162 let mut functions = functions.into_iter();
3163 let mut num_cols = Vec::new();
3164
3165 let (mut left_expr, mut left_scope) =
3169 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3170 num_cols.push(left_scope.len() - 1);
3171 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3173 left_scope
3174 .items
3175 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3176
3177 for function in functions {
3178 let qcx = qcx.empty_derived_context();
3180 let (right_expr, mut right_scope) =
3181 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3182 num_cols.push(right_scope.len() - 1);
3183 let left_col = left_scope.len() - 1;
3184 let right_col = left_scope.len() + right_scope.len() - 1;
3185 let on = HirScalarExpr::call_binary(
3186 HirScalarExpr::column(left_col),
3187 HirScalarExpr::column(right_col),
3188 expr_func::Eq,
3189 );
3190 left_expr = left_expr
3191 .join(right_expr, on, JoinKind::FullOuter)
3192 .map(vec![HirScalarExpr::call_variadic(
3193 Coalesce,
3194 vec![
3195 HirScalarExpr::column(left_col),
3196 HirScalarExpr::column(right_col),
3197 ],
3198 )]);
3199
3200 left_expr = left_expr.project(
3203 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3206 );
3207 right_scope.items.push(left_scope.items.pop().unwrap());
3209
3210 left_scope.items.extend(right_scope.items);
3211 }
3212
3213 Ok((left_expr, left_scope, num_cols))
3214}
3215
3216fn plan_solitary_table_function(
3220 qcx: &QueryContext,
3221 function: &Function<Aug>,
3222 alias: Option<&TableAlias>,
3223 with_ordinality: bool,
3224) -> Result<(HirRelationExpr, Scope), PlanError> {
3225 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3226
3227 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3228 if single_column_function {
3229 let item = &mut scope.items[0];
3230
3231 item.from_single_column_function = true;
3234
3235 if let Some(alias) = alias {
3250 if let ScopeItem {
3251 table_name: Some(table_name),
3252 column_name,
3253 ..
3254 } = item
3255 {
3256 if table_name.item.as_str() == column_name.as_str() {
3257 *column_name = normalize::column_name(alias.name.clone());
3258 }
3259 }
3260 }
3261 }
3262
3263 let scope = plan_table_alias(scope, alias)?;
3264 Ok((expr, scope))
3265}
3266
3267fn plan_table_function_internal(
3272 qcx: &QueryContext,
3273 Function {
3274 name,
3275 args,
3276 filter,
3277 over,
3278 distinct,
3279 }: &Function<Aug>,
3280 with_ordinality: bool,
3281 table_name: Option<FullItemName>,
3282) -> Result<(HirRelationExpr, Scope), PlanError> {
3283 assert_none!(filter, "cannot parse table function with FILTER");
3284 assert_none!(over, "cannot parse table function with OVER");
3285 assert!(!*distinct, "cannot parse table function with DISTINCT");
3286
3287 let ecx = &ExprContext {
3288 qcx,
3289 name: "table function arguments",
3290 scope: &Scope::empty(),
3291 relation_type: &SqlRelationType::empty(),
3292 allow_aggregates: false,
3293 allow_subqueries: true,
3294 allow_parameters: true,
3295 allow_windows: false,
3296 };
3297
3298 let scalar_args = match args {
3299 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3300 FunctionArgs::Args { args, order_by } => {
3301 if !order_by.is_empty() {
3302 sql_bail!(
3303 "ORDER BY specified, but {} is not an aggregate function",
3304 name
3305 );
3306 }
3307 plan_exprs(ecx, args)?
3308 }
3309 };
3310
3311 let table_name = match table_name {
3312 Some(table_name) => table_name.item,
3313 None => name.full_item_name().item.clone(),
3314 };
3315
3316 let scope_name = Some(PartialItemName {
3317 database: None,
3318 schema: None,
3319 item: table_name,
3320 });
3321
3322 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3323 Func::Table(impls) => {
3324 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3325 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3326 let expr = match tf.imp {
3327 TableFuncImpl::CallTable { mut func, exprs } => {
3328 if with_ordinality {
3329 func = TableFunc::with_ordinality(func.clone()).ok_or(
3330 PlanError::Unsupported {
3331 feature: format!("WITH ORDINALITY on {}", func),
3332 discussion_no: None,
3333 },
3334 )?;
3335 }
3336 HirRelationExpr::CallTable { func, exprs }
3337 }
3338 TableFuncImpl::Expr(expr) => {
3339 if !with_ordinality {
3340 expr
3341 } else {
3342 if qcx
3346 .scx
3347 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3348 {
3349 tracing::error!(
3353 %name,
3354 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3355 );
3356 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3357 func: WindowExprType::Scalar(ScalarWindowExpr {
3358 func: ScalarWindowFunc::RowNumber,
3359 order_by: vec![],
3360 }),
3361 partition_by: vec![],
3362 order_by: vec![],
3363 })])
3364 } else {
3365 bail_unsupported!(format!(
3366 "WITH ORDINALITY or ROWS FROM with {}",
3367 name
3368 ));
3369 }
3370 }
3371 }
3372 };
3373 (expr, scope)
3374 }
3375 Func::Scalar(impls) => {
3376 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3377 let output = expr.typ(
3378 &qcx.outer_relation_types,
3379 &SqlRelationType::new(vec![]),
3380 &qcx.scx.param_types.borrow(),
3381 );
3382
3383 let relation = SqlRelationType::new(vec![output]);
3384
3385 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3386 let column_name = normalize::column_name(function_ident);
3387 let name = column_name.to_string();
3388
3389 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3390
3391 let mut func = TableFunc::TabletizedScalar { relation, name };
3392 if with_ordinality {
3393 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3394 feature: format!("WITH ORDINALITY on {}", func),
3395 discussion_no: None,
3396 })?;
3397 }
3398 (
3399 HirRelationExpr::CallTable {
3400 func,
3401 exprs: vec![expr],
3402 },
3403 scope,
3404 )
3405 }
3406 o => sql_bail!(
3407 "{} functions are not supported in functions in FROM",
3408 o.class()
3409 ),
3410 };
3411
3412 if with_ordinality {
3413 scope
3414 .items
3415 .push(ScopeItem::from_name(scope_name, "ordinality"));
3416 }
3417
3418 Ok((expr, scope))
3419}
3420
3421fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3422 if let Some(TableAlias {
3423 name,
3424 columns,
3425 strict,
3426 }) = alias
3427 {
3428 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3429 sql_bail!(
3430 "{} has {} columns available but {} columns specified",
3431 name,
3432 scope.items.len(),
3433 columns.len()
3434 );
3435 }
3436
3437 let table_name = normalize::ident(name.to_owned());
3438 for (i, item) in scope.items.iter_mut().enumerate() {
3439 item.table_name = if item.allow_unqualified_references {
3440 Some(PartialItemName {
3441 database: None,
3442 schema: None,
3443 item: table_name.clone(),
3444 })
3445 } else {
3446 None
3480 };
3481 item.column_name = columns
3482 .get(i)
3483 .map(|a| normalize::column_name(a.clone()))
3484 .unwrap_or_else(|| item.column_name.clone());
3485 }
3486 }
3487 Ok(scope)
3488}
3489
3490fn invent_column_name(
3494 ecx: &ExprContext,
3495 expr: &Expr<Aug>,
3496 table_func_names: &BTreeMap<String, Ident>,
3497) -> Result<Option<ColumnName>, PlanError> {
3498 #[derive(Debug)]
3505 enum NameQuality {
3506 Low,
3507 High,
3508 }
3509
3510 fn invent(
3511 ecx: &ExprContext,
3512 expr: &Expr<Aug>,
3513 table_func_names: &BTreeMap<String, Ident>,
3514 ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3515 Ok(match expr {
3516 Expr::Identifier(names) => {
3517 if let [name] = names.as_slice() {
3518 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3519 return Ok(Some((
3520 normalize::column_name(table_func_name.clone()),
3521 NameQuality::High,
3522 )));
3523 }
3524 }
3525 names
3526 .last()
3527 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3528 }
3529 Expr::Value(v) => match v {
3530 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3533 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3534 _ => None,
3535 },
3536 Expr::Function(func) => {
3537 let (schema, item) = match &func.name {
3538 ResolvedItemName::Item {
3539 qualifiers,
3540 full_name,
3541 ..
3542 } => (&qualifiers.schema_spec, full_name.item.clone()),
3543 _ => {
3546 bail_internal!("function name did not resolve to an item: {:?}", func.name)
3547 }
3548 };
3549
3550 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3551 || schema
3552 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3553 {
3554 None
3555 } else {
3556 Some((item.into(), NameQuality::High))
3557 }
3558 }
3559 Expr::HomogenizingFunction { function, .. } => Some((
3560 function.to_string().to_lowercase().into(),
3561 NameQuality::High,
3562 )),
3563 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3564 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3565 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3566 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3567 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3568 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3569 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3570 },
3571 Expr::Case { else_result, .. } => {
3572 let inner = match else_result.as_ref() {
3573 Some(else_result) => invent(ecx, else_result, table_func_names)?,
3574 None => None,
3575 };
3576 match inner {
3577 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3578 _ => Some(("case".into(), NameQuality::Low)),
3579 }
3580 }
3581 Expr::FieldAccess { field, .. } => {
3582 Some((normalize::column_name(field.clone()), NameQuality::High))
3583 }
3584 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3585 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3586 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3587 let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3595 else {
3596 return Ok(None);
3597 };
3598 scope
3599 .items
3600 .first()
3601 .map(|name| (name.column_name.clone(), NameQuality::High))
3602 }
3603 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3604 _ => None,
3605 })
3606 }
3607
3608 Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3609}
3610
3611#[derive(Debug)]
3612enum ExpandedSelectItem<'a> {
3613 InputOrdinal(usize),
3614 Expr(Cow<'a, Expr<Aug>>),
3615}
3616
3617impl ExpandedSelectItem<'_> {
3618 fn as_expr(&self) -> Option<&Expr<Aug>> {
3619 match self {
3620 ExpandedSelectItem::InputOrdinal(_) => None,
3621 ExpandedSelectItem::Expr(expr) => Some(expr),
3622 }
3623 }
3624}
3625
3626fn expand_select_item<'a>(
3627 ecx: &ExprContext,
3628 s: &'a SelectItem<Aug>,
3629 table_func_names: &BTreeMap<String, Ident>,
3630) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3631 match s {
3632 SelectItem::Expr {
3633 expr: Expr::QualifiedWildcard(table_name),
3634 alias: _,
3635 } => {
3636 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3637 let table_name =
3638 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3639 let out: Vec<_> = ecx
3640 .scope
3641 .items
3642 .iter()
3643 .enumerate()
3644 .filter(|(_i, item)| item.is_from_table(&table_name))
3645 .map(|(i, item)| {
3646 let name = item.column_name.clone();
3647 (ExpandedSelectItem::InputOrdinal(i), name)
3648 })
3649 .collect();
3650 if out.is_empty() {
3651 sql_bail!("no table named '{}' in scope", table_name);
3652 }
3653 Ok(out)
3654 }
3655 SelectItem::Expr {
3656 expr: Expr::WildcardAccess(sql_expr),
3657 alias: _,
3658 } => {
3659 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3660 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3666 let fields = match ecx.scalar_type(&expr) {
3667 SqlScalarType::Record { fields, .. } => fields,
3668 ty => sql_bail!(
3669 "type {} is not composite",
3670 ecx.humanize_sql_scalar_type(&ty, false)
3671 ),
3672 };
3673 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3674 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3675 if let [name] = ident.as_slice() {
3676 if let Ok(items) = ecx.scope.items_from_table(
3677 &[],
3678 &PartialItemName {
3679 database: None,
3680 schema: None,
3681 item: name.as_str().to_string(),
3682 },
3683 ) {
3684 for (_, item) in items {
3685 if item
3686 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3687 {
3688 skip_cols.insert(item.column_name.clone());
3689 }
3690 }
3691 }
3692 }
3693 }
3694 let items = fields
3695 .iter()
3696 .filter_map(|(name, _ty)| {
3697 if skip_cols.contains(name) {
3698 None
3699 } else {
3700 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3701 expr: sql_expr.clone(),
3702 field: name.clone().into(),
3703 }));
3704 Some((item, name.clone()))
3705 }
3706 })
3707 .collect();
3708 Ok(items)
3709 }
3710 SelectItem::Wildcard => {
3711 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3712 let items: Vec<_> = ecx
3713 .scope
3714 .items
3715 .iter()
3716 .enumerate()
3717 .filter(|(_i, item)| item.allow_unqualified_references)
3718 .map(|(i, item)| {
3719 let name = item.column_name.clone();
3720 (ExpandedSelectItem::InputOrdinal(i), name)
3721 })
3722 .collect();
3723
3724 Ok(items)
3725 }
3726 SelectItem::Expr { expr, alias } => {
3727 let name = match alias.clone().map(normalize::column_name) {
3728 Some(name) => name,
3729 None => invent_column_name(ecx, expr, table_func_names)?
3730 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3731 };
3732 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3733 }
3734 }
3735}
3736
3737fn plan_join(
3738 left_qcx: &QueryContext,
3739 left: HirRelationExpr,
3740 left_scope: Scope,
3741 join: &Join<Aug>,
3742) -> Result<(HirRelationExpr, Scope), PlanError> {
3743 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3744 let (kind, constraint) = match &join.join_operator {
3745 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3746 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3747 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3748 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3749 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3750 };
3751
3752 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3753 if !kind.can_be_correlated() {
3754 for item in &mut right_qcx.outer_scopes[0].items {
3755 item.error_if_referenced =
3760 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3761 table: table.cloned(),
3762 column: column.clone(),
3763 });
3764 }
3765 }
3766 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3767
3768 let (expr, scope) = match constraint {
3769 JoinConstraint::On(expr) => {
3770 let product_scope = left_scope.product(right_scope)?;
3771 let ecx = &ExprContext {
3772 qcx: left_qcx,
3773 name: "ON clause",
3774 scope: &product_scope,
3775 relation_type: &SqlRelationType::new(
3776 left_qcx
3777 .relation_type(&left)
3778 .column_types
3779 .into_iter()
3780 .chain(right_qcx.relation_type(&right).column_types)
3781 .collect(),
3782 ),
3783 allow_aggregates: false,
3784 allow_subqueries: true,
3785 allow_parameters: true,
3786 allow_windows: false,
3787 };
3788 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3789 let joined = left.join(right, on, kind);
3790 (joined, product_scope)
3791 }
3792 JoinConstraint::Using { columns, alias } => {
3793 let column_names = columns
3794 .iter()
3795 .map(|ident| normalize::column_name(ident.clone()))
3796 .collect::<Vec<_>>();
3797
3798 plan_using_constraint(
3799 &column_names,
3800 left_qcx,
3801 left,
3802 left_scope,
3803 &right_qcx,
3804 right,
3805 right_scope,
3806 kind,
3807 alias.as_ref(),
3808 )?
3809 }
3810 JoinConstraint::Natural => {
3811 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3814 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3815 let left_column_names = left_scope.column_names();
3816 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3817 let column_names: Vec<_> = left_column_names
3818 .filter(|col| right_column_names.contains(col))
3819 .cloned()
3820 .collect();
3821 plan_using_constraint(
3822 &column_names,
3823 left_qcx,
3824 left,
3825 left_scope,
3826 &right_qcx,
3827 right,
3828 right_scope,
3829 kind,
3830 None,
3831 )?
3832 }
3833 };
3834 Ok((expr, scope))
3835}
3836
3837#[allow(clippy::too_many_arguments)]
3839fn plan_using_constraint(
3840 column_names: &[ColumnName],
3841 left_qcx: &QueryContext,
3842 left: HirRelationExpr,
3843 left_scope: Scope,
3844 right_qcx: &QueryContext,
3845 right: HirRelationExpr,
3846 right_scope: Scope,
3847 kind: JoinKind,
3848 alias: Option<&Ident>,
3849) -> Result<(HirRelationExpr, Scope), PlanError> {
3850 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3851
3852 let mut unique_column_names = BTreeSet::new();
3855 for c in column_names {
3856 if !unique_column_names.insert(c) {
3857 return Err(PlanError::Unsupported {
3858 feature: format!(
3859 "column name {} appears more than once in USING clause",
3860 c.quoted()
3861 ),
3862 discussion_no: None,
3863 });
3864 }
3865 }
3866
3867 let alias_item_name = alias.map(|alias| PartialItemName {
3868 database: None,
3869 schema: None,
3870 item: alias.clone().to_string(),
3871 });
3872
3873 if let Some(alias_item_name) = &alias_item_name {
3874 for partial_item_name in both_scope.table_names() {
3875 if partial_item_name.matches(alias_item_name) {
3876 sql_bail!(
3877 "table name \"{}\" specified more than once",
3878 alias_item_name
3879 )
3880 }
3881 }
3882 }
3883
3884 let ecx = &ExprContext {
3885 qcx: right_qcx,
3886 name: "USING clause",
3887 scope: &both_scope,
3888 relation_type: &SqlRelationType::new(
3889 left_qcx
3890 .relation_type(&left)
3891 .column_types
3892 .into_iter()
3893 .chain(right_qcx.relation_type(&right).column_types)
3894 .collect(),
3895 ),
3896 allow_aggregates: false,
3897 allow_subqueries: false,
3898 allow_parameters: false,
3899 allow_windows: false,
3900 };
3901
3902 let mut join_exprs = vec![];
3903 let mut map_exprs = vec![];
3904 let mut new_items = vec![];
3905 let mut join_cols = vec![];
3906 let mut hidden_cols = vec![];
3907
3908 for column_name in column_names {
3909 let (lhs, lhs_name) = left_scope.resolve_using_column(
3911 column_name,
3912 JoinSide::Left,
3913 &mut left_qcx.name_manager.borrow_mut(),
3914 )?;
3915 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3916 column_name,
3917 JoinSide::Right,
3918 &mut right_qcx.name_manager.borrow_mut(),
3919 )?;
3920
3921 rhs.column += left_scope.len();
3923
3924 let mut exprs = coerce_homogeneous_exprs(
3926 &ecx.with_name(&format!(
3927 "NATURAL/USING join column {}",
3928 column_name.quoted()
3929 )),
3930 vec![
3931 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3932 lhs,
3933 Arc::clone(&lhs_name),
3934 )),
3935 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3936 rhs,
3937 Arc::clone(&rhs_name),
3938 )),
3939 ],
3940 None,
3941 )?;
3942 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3943
3944 match kind {
3945 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3946 join_cols.push(lhs.column);
3947 hidden_cols.push(rhs.column);
3948 }
3949 JoinKind::RightOuter => {
3950 join_cols.push(rhs.column);
3951 hidden_cols.push(lhs.column);
3952 }
3953 JoinKind::FullOuter => {
3954 join_cols.push(both_scope.items.len() + map_exprs.len());
3957 hidden_cols.push(lhs.column);
3958 hidden_cols.push(rhs.column);
3959 map_exprs.push(HirScalarExpr::call_variadic(
3960 Coalesce,
3961 vec![expr1.clone(), expr2.clone()],
3962 ));
3963 new_items.push(ScopeItem::from_column_name(column_name));
3964 }
3965 }
3966
3967 if alias_item_name.is_some() {
3972 let new_item_col = both_scope.items.len() + new_items.len();
3973 join_cols.push(new_item_col);
3974 hidden_cols.push(new_item_col);
3975
3976 new_items.push(ScopeItem::from_name(
3977 alias_item_name.clone(),
3978 column_name.clone().to_string(),
3979 ));
3980
3981 let alias_expr = match kind {
3988 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3989 HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name))
3990 }
3991 JoinKind::RightOuter => HirScalarExpr::named_column(rhs, Arc::clone(&rhs_name)),
3992 JoinKind::FullOuter => {
3993 HirScalarExpr::call_variadic(Coalesce, vec![expr1.clone(), expr2.clone()])
3994 }
3995 };
3996 map_exprs.push(alias_expr);
3997 }
3998
3999 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4000 }
4001 both_scope.items.extend(new_items);
4002
4003 for c in hidden_cols {
4007 both_scope.items[c].allow_unqualified_references = false;
4008 }
4009
4010 let project_key = join_cols
4012 .into_iter()
4013 .chain(0..both_scope.items.len())
4014 .unique()
4015 .collect::<Vec<_>>();
4016
4017 both_scope = both_scope.project(&project_key);
4018
4019 let on = HirScalarExpr::variadic_and(join_exprs);
4020
4021 let both = left
4022 .join(right, on, kind)
4023 .map(map_exprs)
4024 .project(project_key);
4025 Ok((both, both_scope))
4026}
4027
4028pub fn plan_expr<'a>(
4029 ecx: &'a ExprContext,
4030 e: &Expr<Aug>,
4031) -> Result<CoercibleScalarExpr, PlanError> {
4032 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4033}
4034
4035fn plan_expr_inner<'a>(
4036 ecx: &'a ExprContext,
4037 e: &Expr<Aug>,
4038) -> Result<CoercibleScalarExpr, PlanError> {
4039 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4040 return Ok(HirScalarExpr::named_column(
4042 i,
4043 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4044 )
4045 .into());
4046 }
4047
4048 match e {
4049 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4051 Ok(plan_identifier(ecx, names)?.into())
4052 }
4053
4054 Expr::Value(val) => plan_literal(val),
4056 Expr::Parameter(n) => plan_parameter(ecx, *n),
4057 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4058 Expr::List(exprs) => plan_list(ecx, exprs, None),
4059 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4060 Expr::Row { exprs } => plan_row(ecx, exprs),
4061
4062 Expr::Op { op, expr1, expr2 } => {
4064 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4065 }
4066 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4067 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4068
4069 Expr::Not { expr } => plan_not(ecx, expr),
4071 Expr::And { left, right } => plan_and(ecx, left, right),
4072 Expr::Or { left, right } => plan_or(ecx, left, right),
4073 Expr::IsExpr {
4074 expr,
4075 construct,
4076 negated,
4077 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4078 Expr::Case {
4079 operand,
4080 conditions,
4081 results,
4082 else_result,
4083 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4084 Expr::HomogenizingFunction { function, exprs } => {
4085 plan_homogenizing_function(ecx, function, exprs)
4086 }
4087 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4088 ecx,
4089 &None,
4090 &[l_expr.clone().equals(*r_expr.clone())],
4091 &[Expr::null()],
4092 &Some(Box::new(*l_expr.clone())),
4093 )?
4094 .into()),
4095 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4096 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4097 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4098 Expr::Like {
4099 expr,
4100 pattern,
4101 escape,
4102 case_insensitive,
4103 negated,
4104 } => Ok(plan_like(
4105 ecx,
4106 expr,
4107 pattern,
4108 escape.as_deref(),
4109 *case_insensitive,
4110 *negated,
4111 )?
4112 .into()),
4113
4114 Expr::InList {
4115 expr,
4116 list,
4117 negated,
4118 } => plan_in_list(ecx, expr, list, negated),
4119
4120 Expr::Exists(query) => plan_exists(ecx, query),
4122 Expr::Subquery(query) => plan_subquery(ecx, query),
4123 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4124 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4125 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4126 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4127 Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4128 Expr::InSubquery { .. } => {
4129 bail_internal!("Expr::InSubquery should have been desugared")
4130 }
4131 Expr::AnyExpr { .. } => {
4132 bail_internal!("Expr::AnyExpr should have been desugared")
4133 }
4134 Expr::AllExpr { .. } => {
4135 bail_internal!("Expr::AllExpr should have been desugared")
4136 }
4137 Expr::AnySubquery { .. } => {
4138 bail_internal!("Expr::AnySubquery should have been desugared")
4139 }
4140 Expr::AllSubquery { .. } => {
4141 bail_internal!("Expr::AllSubquery should have been desugared")
4142 }
4143 Expr::Between { .. } => {
4144 bail_internal!("Expr::Between should have been desugared")
4145 }
4146 }
4147}
4148
4149fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4150 if !ecx.allow_parameters {
4151 return Err(PlanError::UnknownParameter(n));
4155 }
4156 if n == 0 || n > 65536 {
4157 return Err(PlanError::UnknownParameter(n));
4158 }
4159 if ecx.param_types().borrow().contains_key(&n) {
4160 Ok(HirScalarExpr::parameter(n).into())
4161 } else {
4162 Ok(CoercibleScalarExpr::Parameter(n))
4163 }
4164}
4165
4166fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4167 let mut out = vec![];
4168 for e in exprs {
4169 out.push(plan_expr(ecx, e)?);
4170 }
4171 Ok(CoercibleScalarExpr::LiteralRecord(out))
4172}
4173
4174fn plan_cast(
4175 ecx: &ExprContext,
4176 expr: &Expr<Aug>,
4177 data_type: &ResolvedDataType,
4178) -> Result<CoercibleScalarExpr, PlanError> {
4179 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4180 let expr = match expr {
4181 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4190 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4191 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4192 _ => plan_expr(ecx, expr)?,
4193 };
4194 let ecx = &ecx.with_name("CAST");
4195 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4196 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4197 Ok(expr.into())
4198}
4199
4200fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4201 let ecx = ecx.with_name("NOT argument");
4202 Ok(plan_expr(&ecx, expr)?
4203 .type_as(&ecx, &SqlScalarType::Bool)?
4204 .call_unary(UnaryFunc::Not(expr_func::Not))
4205 .into())
4206}
4207
4208fn plan_and(
4209 ecx: &ExprContext,
4210 left: &Expr<Aug>,
4211 right: &Expr<Aug>,
4212) -> Result<CoercibleScalarExpr, PlanError> {
4213 let ecx = ecx.with_name("AND argument");
4214 Ok(HirScalarExpr::variadic_and(vec![
4215 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4216 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217 ])
4218 .into())
4219}
4220
4221fn plan_or(
4222 ecx: &ExprContext,
4223 left: &Expr<Aug>,
4224 right: &Expr<Aug>,
4225) -> Result<CoercibleScalarExpr, PlanError> {
4226 let ecx = ecx.with_name("OR argument");
4227 Ok(HirScalarExpr::variadic_or(vec![
4228 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4229 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4230 ])
4231 .into())
4232}
4233
4234fn plan_in_list(
4235 ecx: &ExprContext,
4236 lhs: &Expr<Aug>,
4237 list: &Vec<Expr<Aug>>,
4238 negated: &bool,
4239) -> Result<CoercibleScalarExpr, PlanError> {
4240 let ecx = ecx.with_name("IN list");
4241 let or = HirScalarExpr::variadic_or(
4242 list.into_iter()
4243 .map(|e| {
4244 let eq = lhs.clone().equals(e.clone());
4245 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4246 })
4247 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4248 );
4249 Ok(if *negated {
4250 or.call_unary(UnaryFunc::Not(expr_func::Not))
4251 } else {
4252 or
4253 }
4254 .into())
4255}
4256
4257fn plan_homogenizing_function(
4258 ecx: &ExprContext,
4259 function: &HomogenizingFunction,
4260 exprs: &[Expr<Aug>],
4261) -> Result<CoercibleScalarExpr, PlanError> {
4262 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4264 match function {
4265 HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4266 HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4267 HomogenizingFunction::Least => VariadicFunc::from(Least),
4268 },
4269 coerce_homogeneous_exprs(
4270 &ecx.with_name(&function.to_string().to_lowercase()),
4271 plan_exprs(ecx, exprs)?,
4272 None,
4273 )?,
4274 );
4275 Ok(expr.into())
4276}
4277
4278fn plan_field_access(
4279 ecx: &ExprContext,
4280 expr: &Expr<Aug>,
4281 field: &Ident,
4282) -> Result<CoercibleScalarExpr, PlanError> {
4283 let field = normalize::column_name(field.clone());
4284 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4285 let ty = ecx.scalar_type(&expr);
4286 let i = match &ty {
4287 SqlScalarType::Record { fields, .. } => {
4288 fields.iter().position(|(name, _ty)| *name == field)
4289 }
4290 ty => sql_bail!(
4291 "column notation applied to type {}, which is not a composite type",
4292 ecx.humanize_sql_scalar_type(ty, false)
4293 ),
4294 };
4295 match i {
4296 None => sql_bail!(
4297 "field {} not found in data type {}",
4298 field,
4299 ecx.humanize_sql_scalar_type(&ty, false)
4300 ),
4301 Some(i) => Ok(expr
4302 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4303 .into()),
4304 }
4305}
4306
4307fn plan_subscript(
4308 ecx: &ExprContext,
4309 expr: &Expr<Aug>,
4310 positions: &[SubscriptPosition<Aug>],
4311) -> Result<CoercibleScalarExpr, PlanError> {
4312 assert!(
4313 !positions.is_empty(),
4314 "subscript expression must contain at least one position"
4315 );
4316
4317 let ecx = &ecx.with_name("subscripting");
4318 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4319 let ty = ecx.scalar_type(&expr);
4320 match &ty {
4321 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4322 ecx,
4323 expr,
4324 positions,
4325 if ty == SqlScalarType::Int2Vector {
4329 1
4330 } else {
4331 0
4332 },
4333 ),
4334 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4335 SqlScalarType::List { element_type, .. } => {
4336 let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4338 let n_layers = ty.unwrap_list_n_layers();
4339 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4340 }
4341 ty => sql_bail!(
4342 "cannot subscript type {}",
4343 ecx.humanize_sql_scalar_type(ty, false)
4344 ),
4345 }
4346}
4347
4348fn extract_scalar_subscript_from_positions<'a>(
4352 positions: &'a [SubscriptPosition<Aug>],
4353 expr_type_name: &str,
4354) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4355 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4356 for p in positions {
4357 if p.explicit_slice {
4358 sql_bail!("{} subscript does not support slices", expr_type_name);
4359 }
4360 assert!(
4361 p.end.is_none(),
4362 "index-appearing subscripts cannot have end value"
4363 );
4364 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4365 }
4366 Ok(scalar_subscripts)
4367}
4368
4369fn plan_subscript_array(
4370 ecx: &ExprContext,
4371 expr: HirScalarExpr,
4372 positions: &[SubscriptPosition<Aug>],
4373 offset: i64,
4374) -> Result<CoercibleScalarExpr, PlanError> {
4375 let mut exprs = Vec::with_capacity(positions.len() + 1);
4376 exprs.push(expr);
4377
4378 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4381
4382 for i in indexes {
4383 exprs.push(plan_expr(ecx, i)?.cast_to(
4384 ecx,
4385 CastContext::Explicit,
4386 &SqlScalarType::Int64,
4387 )?);
4388 }
4389
4390 Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4391}
4392
4393fn plan_subscript_list(
4394 ecx: &ExprContext,
4395 mut expr: HirScalarExpr,
4396 positions: &[SubscriptPosition<Aug>],
4397 mut remaining_layers: usize,
4398 elem_type_name: &str,
4399) -> Result<CoercibleScalarExpr, PlanError> {
4400 let mut i = 0;
4401
4402 while i < positions.len() {
4403 let j = positions[i..]
4405 .iter()
4406 .position(|p| p.explicit_slice)
4407 .unwrap_or(positions.len() - i);
4408 if j != 0 {
4409 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4410 let (n, e) = plan_index_list(
4411 ecx,
4412 expr,
4413 indexes.as_slice(),
4414 remaining_layers,
4415 elem_type_name,
4416 )?;
4417 remaining_layers = n;
4418 expr = e;
4419 i += j;
4420 }
4421
4422 let j = positions[i..]
4424 .iter()
4425 .position(|p| !p.explicit_slice)
4426 .unwrap_or(positions.len() - i);
4427 if j != 0 {
4428 expr = plan_slice_list(
4429 ecx,
4430 expr,
4431 &positions[i..i + j],
4432 remaining_layers,
4433 elem_type_name,
4434 )?;
4435 i += j;
4436 }
4437 }
4438
4439 Ok(expr.into())
4440}
4441
4442fn plan_index_list(
4443 ecx: &ExprContext,
4444 expr: HirScalarExpr,
4445 indexes: &[&Expr<Aug>],
4446 n_layers: usize,
4447 elem_type_name: &str,
4448) -> Result<(usize, HirScalarExpr), PlanError> {
4449 let depth = indexes.len();
4450
4451 if depth > n_layers {
4452 if n_layers == 0 {
4453 sql_bail!("cannot subscript type {}", elem_type_name)
4454 } else {
4455 sql_bail!(
4456 "cannot index into {} layers; list only has {} layer{}",
4457 depth,
4458 n_layers,
4459 if n_layers == 1 { "" } else { "s" }
4460 )
4461 }
4462 }
4463
4464 let mut exprs = Vec::with_capacity(depth + 1);
4465 exprs.push(expr);
4466
4467 for i in indexes {
4468 exprs.push(plan_expr(ecx, i)?.cast_to(
4469 ecx,
4470 CastContext::Explicit,
4471 &SqlScalarType::Int64,
4472 )?);
4473 }
4474
4475 Ok((
4476 n_layers - depth,
4477 HirScalarExpr::call_variadic(ListIndex, exprs),
4478 ))
4479}
4480
4481fn plan_slice_list(
4482 ecx: &ExprContext,
4483 expr: HirScalarExpr,
4484 slices: &[SubscriptPosition<Aug>],
4485 n_layers: usize,
4486 elem_type_name: &str,
4487) -> Result<HirScalarExpr, PlanError> {
4488 if n_layers == 0 {
4489 sql_bail!("cannot subscript type {}", elem_type_name)
4490 }
4491
4492 let mut exprs = Vec::with_capacity(slices.len() + 1);
4494 exprs.push(expr);
4495 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4497 Ok(match position {
4498 Some(p) => {
4499 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4500 }
4501 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4502 })
4503 };
4504 for p in slices {
4505 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4506 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4507 exprs.push(start);
4508 exprs.push(end);
4509 }
4510
4511 Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4512}
4513
4514fn plan_like(
4515 ecx: &ExprContext,
4516 expr: &Expr<Aug>,
4517 pattern: &Expr<Aug>,
4518 escape: Option<&Expr<Aug>>,
4519 case_insensitive: bool,
4520 not: bool,
4521) -> Result<HirScalarExpr, PlanError> {
4522 use CastContext::Implicit;
4523 let ecx = ecx.with_name("LIKE argument");
4524 let expr = plan_expr(&ecx, expr)?;
4525 let haystack = match ecx.scalar_type(&expr) {
4526 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4527 .type_as(&ecx, ty)?
4528 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4529 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4530 };
4531 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4532 if let Some(escape) = escape {
4533 pattern = pattern.call_binary(
4534 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4535 expr_func::LikeEscape,
4536 );
4537 }
4538 let func: BinaryFunc = if case_insensitive {
4539 expr_func::IsLikeMatchCaseInsensitive.into()
4540 } else {
4541 expr_func::IsLikeMatchCaseSensitive.into()
4542 };
4543 let like = haystack.call_binary(pattern, func);
4544 if not {
4545 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4546 } else {
4547 Ok(like)
4548 }
4549}
4550
4551fn plan_subscript_jsonb(
4552 ecx: &ExprContext,
4553 expr: HirScalarExpr,
4554 positions: &[SubscriptPosition<Aug>],
4555) -> Result<CoercibleScalarExpr, PlanError> {
4556 use CastContext::Implicit;
4557 use SqlScalarType::{Int64, String};
4558
4559 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4562
4563 let mut exprs = Vec::with_capacity(subscripts.len());
4564 for s in subscripts {
4565 let subscript = plan_expr(ecx, s)?;
4566 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4567 subscript
4568 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4569 typeconv::to_string(ecx, subscript)
4573 } else {
4574 sql_bail!("jsonb subscript type must be coercible to integer or text");
4575 };
4576 exprs.push(subscript);
4577 }
4578
4579 let expr = expr.call_binary(
4582 HirScalarExpr::call_variadic(
4583 ArrayCreate {
4584 elem_type: SqlScalarType::String,
4585 },
4586 exprs,
4587 ),
4588 expr_func::JsonbGetPath,
4589 );
4590 Ok(expr.into())
4591}
4592
4593fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4594 if !ecx.allow_subqueries {
4595 sql_bail!("{} does not allow subqueries", ecx.name)
4596 }
4597 let mut qcx = ecx.derived_query_context();
4598 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4599 Ok(expr.exists().into())
4600}
4601
4602fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4603 if !ecx.allow_subqueries {
4604 sql_bail!("{} does not allow subqueries", ecx.name)
4605 }
4606 let mut qcx = ecx.derived_query_context();
4607 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4608 let column_types = qcx.relation_type(&expr).column_types;
4609 if column_types.len() != 1 {
4610 sql_bail!(
4611 "Expected subselect to return 1 column, got {} columns",
4612 column_types.len()
4613 );
4614 }
4615 Ok(expr.select().into())
4616}
4617
4618fn plan_list_subquery(
4619 ecx: &ExprContext,
4620 query: &Query<Aug>,
4621) -> Result<CoercibleScalarExpr, PlanError> {
4622 plan_vector_like_subquery(
4623 ecx,
4624 query,
4625 |_| false,
4626 |elem_type| ListCreate { elem_type }.into(),
4627 |order_by| AggregateFunc::ListConcat { order_by },
4628 expr_func::ListListConcat.into(),
4629 |elem_type| {
4630 HirScalarExpr::literal(
4631 Datum::empty_list(),
4632 SqlScalarType::List {
4633 element_type: Box::new(elem_type),
4634 custom_id: None,
4635 },
4636 )
4637 },
4638 "list",
4639 )
4640}
4641
4642fn plan_array_subquery(
4643 ecx: &ExprContext,
4644 query: &Query<Aug>,
4645) -> Result<CoercibleScalarExpr, PlanError> {
4646 plan_vector_like_subquery(
4647 ecx,
4648 query,
4649 |elem_type| {
4650 matches!(
4651 elem_type,
4652 SqlScalarType::Char { .. }
4653 | SqlScalarType::Array { .. }
4654 | SqlScalarType::List { .. }
4655 | SqlScalarType::Map { .. }
4656 )
4657 },
4658 |elem_type| ArrayCreate { elem_type }.into(),
4659 |order_by| AggregateFunc::ArrayConcat { order_by },
4660 expr_func::ArrayArrayConcat.into(),
4661 |elem_type| {
4662 HirScalarExpr::literal(
4663 Datum::empty_array(),
4664 SqlScalarType::Array(Box::new(elem_type)),
4665 )
4666 },
4667 "[]",
4668 )
4669}
4670
4671fn plan_vector_like_subquery<F1, F2, F3, F4>(
4673 ecx: &ExprContext,
4674 query: &Query<Aug>,
4675 is_unsupported_type: F1,
4676 vector_create: F2,
4677 aggregate_concat: F3,
4678 binary_concat: BinaryFunc,
4679 empty_literal: F4,
4680 vector_type_string: &str,
4681) -> Result<CoercibleScalarExpr, PlanError>
4682where
4683 F1: Fn(&SqlScalarType) -> bool,
4684 F2: Fn(SqlScalarType) -> VariadicFunc,
4685 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4686 F4: Fn(SqlScalarType) -> HirScalarExpr,
4687{
4688 if !ecx.allow_subqueries {
4689 sql_bail!("{} does not allow subqueries", ecx.name)
4690 }
4691
4692 let mut qcx = ecx.derived_query_context();
4693 let mut planned_query = plan_query(&mut qcx, query)?;
4694 if planned_query.limit.is_some()
4695 || !planned_query
4696 .offset
4697 .clone()
4698 .try_into_literal_int64()
4699 .is_ok_and(|offset| offset == 0)
4700 {
4701 planned_query.expr = HirRelationExpr::top_k(
4702 planned_query.expr,
4703 vec![],
4704 planned_query.order_by.clone(),
4705 planned_query.limit,
4706 planned_query.offset,
4707 planned_query.group_size_hints.limit_input_group_size,
4708 );
4709 }
4710
4711 if planned_query.project.len() != 1 {
4712 sql_bail!(
4713 "Expected subselect to return 1 column, got {} columns",
4714 planned_query.project.len()
4715 );
4716 }
4717
4718 let project_column = *planned_query.project.get(0).unwrap();
4719 let elem_type = qcx
4720 .relation_type(&planned_query.expr)
4721 .column_types
4722 .get(project_column)
4723 .cloned()
4724 .unwrap()
4725 .scalar_type();
4726
4727 if is_unsupported_type(&elem_type) {
4728 bail_unsupported!(format!(
4729 "cannot build array from subquery because return type {}{}",
4730 ecx.humanize_sql_scalar_type(&elem_type, false),
4731 vector_type_string
4732 ));
4733 }
4734
4735 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4738 vector_create(elem_type.clone()),
4739 vec![HirScalarExpr::column(project_column)],
4740 ))
4741 .chain(
4742 planned_query
4743 .order_by
4744 .iter()
4745 .map(|co| HirScalarExpr::column(co.column)),
4746 )
4747 .collect();
4748
4749 let aggregation_projection = vec![0];
4753 let aggregation_order_by = planned_query
4754 .order_by
4755 .into_iter()
4756 .enumerate()
4757 .map(|(i, order)| ColumnOrder { column: i, ..order })
4758 .collect();
4759
4760 let reduced_expr = planned_query
4761 .expr
4762 .reduce(
4763 vec![],
4764 vec![AggregateExpr {
4765 func: aggregate_concat(aggregation_order_by),
4766 expr: Box::new(HirScalarExpr::call_variadic(
4767 RecordCreate {
4768 field_names: iter::repeat(ColumnName::from(""))
4769 .take(aggregation_exprs.len())
4770 .collect(),
4771 },
4772 aggregation_exprs,
4773 )),
4774 distinct: false,
4775 }],
4776 None,
4777 )
4778 .project(aggregation_projection);
4779
4780 Ok(reduced_expr
4782 .select()
4783 .call_binary(empty_literal(elem_type), binary_concat)
4784 .into())
4785}
4786
4787fn plan_map_subquery(
4788 ecx: &ExprContext,
4789 query: &Query<Aug>,
4790) -> Result<CoercibleScalarExpr, PlanError> {
4791 if !ecx.allow_subqueries {
4792 sql_bail!("{} does not allow subqueries", ecx.name)
4793 }
4794
4795 let mut qcx = ecx.derived_query_context();
4796 let mut query = plan_query(&mut qcx, query)?;
4797 if query.limit.is_some()
4798 || !query
4799 .offset
4800 .clone()
4801 .try_into_literal_int64()
4802 .is_ok_and(|offset| offset == 0)
4803 {
4804 query.expr = HirRelationExpr::top_k(
4805 query.expr,
4806 vec![],
4807 query.order_by.clone(),
4808 query.limit,
4809 query.offset,
4810 query.group_size_hints.limit_input_group_size,
4811 );
4812 }
4813 if query.project.len() != 2 {
4814 sql_bail!(
4815 "expected map subquery to return 2 columns, got {} columns",
4816 query.project.len()
4817 );
4818 }
4819
4820 let query_types = qcx.relation_type(&query.expr).column_types;
4821 let key_column = query.project[0];
4822 let key_type = query_types[key_column].clone().scalar_type();
4823 let value_column = query.project[1];
4824 let value_type = query_types[value_column].clone().scalar_type();
4825
4826 if key_type != SqlScalarType::String {
4827 sql_bail!("cannot build map from subquery because first column is not of type text");
4828 }
4829
4830 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4831 RecordCreate {
4832 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4833 },
4834 vec![
4835 HirScalarExpr::column(key_column),
4836 HirScalarExpr::column(value_column),
4837 ],
4838 ))
4839 .chain(
4840 query
4841 .order_by
4842 .iter()
4843 .map(|co| HirScalarExpr::column(co.column)),
4844 )
4845 .collect();
4846
4847 let expr = query
4848 .expr
4849 .reduce(
4850 vec![],
4851 vec![AggregateExpr {
4852 func: AggregateFunc::MapAgg {
4853 order_by: query
4854 .order_by
4855 .into_iter()
4856 .enumerate()
4857 .map(|(i, order)| ColumnOrder { column: i, ..order })
4858 .collect(),
4859 value_type: value_type.clone(),
4860 },
4861 expr: Box::new(HirScalarExpr::call_variadic(
4862 RecordCreate {
4863 field_names: iter::repeat(ColumnName::from(""))
4864 .take(aggregation_exprs.len())
4865 .collect(),
4866 },
4867 aggregation_exprs,
4868 )),
4869 distinct: false,
4870 }],
4871 None,
4872 )
4873 .project(vec![0]);
4874
4875 let expr = HirScalarExpr::call_variadic(
4877 Coalesce,
4878 vec![
4879 expr.select(),
4880 HirScalarExpr::literal(
4881 Datum::empty_map(),
4882 SqlScalarType::Map {
4883 value_type: Box::new(value_type),
4884 custom_id: None,
4885 },
4886 ),
4887 ],
4888 );
4889
4890 Ok(expr.into())
4891}
4892
4893fn plan_collate(
4894 ecx: &ExprContext,
4895 expr: &Expr<Aug>,
4896 collation: &UnresolvedItemName,
4897) -> Result<CoercibleScalarExpr, PlanError> {
4898 if collation.0.len() == 2
4899 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4900 && collation.0[1] == ident!("default")
4901 {
4902 plan_expr(ecx, expr)
4903 } else {
4904 bail_unsupported!("COLLATE");
4905 }
4906}
4907
4908fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4915where
4916 E: std::borrow::Borrow<Expr<Aug>>,
4917{
4918 let mut out = vec![];
4919 for expr in exprs {
4920 out.push(plan_expr(ecx, expr.borrow())?);
4921 }
4922 Ok(out)
4923}
4924
4925fn plan_array(
4927 ecx: &ExprContext,
4928 exprs: &[Expr<Aug>],
4929 type_hint: Option<&SqlScalarType>,
4930) -> Result<CoercibleScalarExpr, PlanError> {
4931 let mut out = vec![];
4933 for expr in exprs {
4934 out.push(match expr {
4935 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4938 _ => plan_expr(ecx, expr)?,
4939 });
4940 }
4941
4942 let type_hint = match type_hint {
4944 Some(SqlScalarType::Array(elem_type)) => {
4949 let multidimensional = out.iter().any(|e| {
4950 matches!(
4951 ecx.scalar_type(e),
4952 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4953 )
4954 });
4955 if multidimensional {
4956 type_hint
4957 } else {
4958 Some(&**elem_type)
4959 }
4960 }
4961 Some(_) => None,
4965 None => None,
4967 };
4968
4969 let (elem_type, exprs) = if exprs.is_empty() {
4971 if let Some(elem_type) = type_hint {
4972 (elem_type.clone(), vec![])
4973 } else {
4974 sql_bail!("cannot determine type of empty array");
4975 }
4976 } else {
4977 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4978 (ecx.scalar_type(&out[0]), out)
4979 };
4980
4981 if matches!(
4987 elem_type,
4988 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4989 ) {
4990 bail_unsupported!(format!(
4991 "{}[]",
4992 ecx.humanize_sql_scalar_type(&elem_type, false)
4993 ));
4994 }
4995
4996 Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4997}
4998
4999fn plan_list(
5000 ecx: &ExprContext,
5001 exprs: &[Expr<Aug>],
5002 type_hint: Option<&SqlScalarType>,
5003) -> Result<CoercibleScalarExpr, PlanError> {
5004 let (elem_type, exprs) = if exprs.is_empty() {
5005 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5006 (element_type.without_modifiers(), vec![])
5007 } else {
5008 sql_bail!("cannot determine type of empty list");
5009 }
5010 } else {
5011 let type_hint = match type_hint {
5012 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5013 _ => None,
5014 };
5015
5016 let mut out = vec![];
5017 for expr in exprs {
5018 out.push(match expr {
5019 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5022 _ => plan_expr(ecx, expr)?,
5023 });
5024 }
5025 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5026 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5027 };
5028
5029 if matches!(elem_type, SqlScalarType::Char { .. }) {
5030 bail_unsupported!("char list");
5031 }
5032
5033 Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5034}
5035
5036fn plan_map(
5037 ecx: &ExprContext,
5038 entries: &[MapEntry<Aug>],
5039 type_hint: Option<&SqlScalarType>,
5040) -> Result<CoercibleScalarExpr, PlanError> {
5041 let (value_type, exprs) = if entries.is_empty() {
5042 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5043 (value_type.without_modifiers(), vec![])
5044 } else {
5045 sql_bail!("cannot determine type of empty map");
5046 }
5047 } else {
5048 let type_hint = match type_hint {
5049 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5050 _ => None,
5051 };
5052
5053 let mut keys = vec![];
5054 let mut values = vec![];
5055 for MapEntry { key, value } in entries {
5056 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5057 let value = match value {
5058 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5061 _ => plan_expr(ecx, value)?,
5062 };
5063 keys.push(key);
5064 values.push(value);
5065 }
5066 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5067 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5068 let out = itertools::interleave(keys, values).collect();
5069 (value_type, out)
5070 };
5071
5072 if matches!(value_type, SqlScalarType::Char { .. }) {
5073 bail_unsupported!("char map");
5074 }
5075
5076 let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5077 Ok(expr.into())
5078}
5079
5080pub fn coerce_homogeneous_exprs(
5097 ecx: &ExprContext,
5098 exprs: Vec<CoercibleScalarExpr>,
5099 force_type: Option<&SqlScalarType>,
5100) -> Result<Vec<HirScalarExpr>, PlanError> {
5101 assert!(!exprs.is_empty());
5102
5103 let target_holder;
5104 let target = match force_type {
5105 Some(t) => t,
5106 None => {
5107 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5108 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5109 &target_holder
5110 }
5111 };
5112
5113 let mut out = Vec::new();
5115 for expr in exprs {
5116 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5117 let ccx = match force_type {
5118 None => CastContext::Implicit,
5119 Some(_) => CastContext::Explicit,
5120 };
5121 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5122 Ok(expr) => out.push(expr),
5123 Err(_) => sql_bail!(
5124 "{} could not convert type {} to {}",
5125 ecx.name,
5126 ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5127 ecx.humanize_sql_scalar_type(target, false),
5128 ),
5129 }
5130 }
5131 Ok(out)
5132}
5133
5134pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5137 obe: &OrderByExpr<T>,
5138 column: usize,
5139) -> ColumnOrder {
5140 let desc = !obe.asc.unwrap_or(true);
5141 ColumnOrder {
5142 column,
5143 desc,
5144 nulls_last: obe.nulls_last.unwrap_or(!desc),
5147 }
5148}
5149
5150fn plan_function_order_by(
5158 ecx: &ExprContext,
5159 order_by: &[OrderByExpr<Aug>],
5160) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5161 let mut order_by_exprs = vec![];
5162 let mut col_orders = vec![];
5163 {
5164 for (i, obe) in order_by.iter().enumerate() {
5165 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5169 order_by_exprs.push(expr);
5170 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5171 }
5172 }
5173 Ok((order_by_exprs, col_orders))
5174}
5175
5176fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5181 scx.humanize_resolved_name(name)
5182 .map(|n| n.to_string())
5183 .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5184}
5185
5186fn plan_aggregate_common(
5188 ecx: &ExprContext,
5189 Function::<Aug> {
5190 name,
5191 args,
5192 filter,
5193 over: _,
5194 distinct,
5195 }: &Function<Aug>,
5196) -> Result<AggregateExpr, PlanError> {
5197 let impls = match resolve_func(ecx, name, args)? {
5212 Func::Aggregate(impls) => impls,
5213 _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5214 };
5215
5216 let (args, order_by) = match &args {
5225 FunctionArgs::Star => (vec![], vec![]),
5226 FunctionArgs::Args { args, order_by } => {
5227 if args.is_empty() {
5228 sql_bail!(
5229 "{}(*) must be used to call a parameterless aggregate function",
5230 humanize_or_debug(ecx.qcx.scx, name)
5231 );
5232 }
5233 let args = plan_exprs(ecx, args)?;
5234 (args, order_by.clone())
5235 }
5236 };
5237
5238 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5239
5240 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5241 if let Some(filter) = &filter {
5242 let cond =
5252 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5253 let expr_typ = ecx.scalar_type(&expr);
5254 expr = HirScalarExpr::if_then_else(
5255 cond,
5256 expr,
5257 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5258 );
5259 }
5260
5261 let mut seen_outer = false;
5262 let mut seen_inner = false;
5263 #[allow(deprecated)]
5264 expr.visit_columns(0, &mut |depth, col| {
5265 if depth == 0 && col.level == 0 {
5266 seen_inner = true;
5267 } else if col.level > depth {
5268 seen_outer = true;
5269 }
5270 });
5271 if seen_outer && !seen_inner {
5272 bail_unsupported!(
5273 3720,
5274 "aggregate functions that refer exclusively to outer columns"
5275 );
5276 }
5277
5278 if func.is_order_sensitive() {
5281 let field_names = iter::repeat(ColumnName::from(""))
5282 .take(1 + order_by_exprs.len())
5283 .collect();
5284 let mut exprs = vec![expr];
5285 exprs.extend(order_by_exprs);
5286 expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5287 }
5288
5289 Ok(AggregateExpr {
5290 func,
5291 expr: Box::new(expr),
5292 distinct: *distinct,
5293 })
5294}
5295
5296fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5297 let mut names = names.to_vec();
5298 let Some(last) = names.pop() else {
5301 bail_internal!("empty identifier");
5302 };
5303 let col_name = normalize::column_name(last);
5304
5305 if !names.is_empty() {
5307 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5308 let (i, i_name) = ecx.scope.resolve_table_column(
5309 &ecx.qcx.outer_scopes,
5310 &table_name,
5311 &col_name,
5312 &mut ecx.qcx.name_manager.borrow_mut(),
5313 )?;
5314 return Ok(HirScalarExpr::named_column(i, i_name));
5315 }
5316
5317 let similar_names = match ecx.scope.resolve_column(
5320 &ecx.qcx.outer_scopes,
5321 &col_name,
5322 &mut ecx.qcx.name_manager.borrow_mut(),
5323 ) {
5324 Ok((i, i_name)) => {
5325 return Ok(HirScalarExpr::named_column(i, i_name));
5326 }
5327 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5328 Err(e) => return Err(e),
5329 };
5330
5331 let items = ecx.scope.items_from_table(
5334 &ecx.qcx.outer_scopes,
5335 &PartialItemName {
5336 database: None,
5337 schema: None,
5338 item: col_name.as_str().to_owned(),
5339 },
5340 )?;
5341 match items.as_slice() {
5342 [] => Err(PlanError::UnknownColumn {
5344 table: None,
5345 column: col_name,
5346 similar: similar_names,
5347 }),
5348 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5353 *column,
5354 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5355 )),
5356 _ => {
5359 let mut has_exists_column = None;
5360 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5361 .into_iter()
5362 .filter_map(|(column, item)| {
5363 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5364 has_exists_column = Some(column);
5365 None
5366 } else {
5367 let expr = HirScalarExpr::named_column(
5368 column,
5369 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5370 );
5371 let name = item.column_name.clone();
5372 Some((expr, name))
5373 }
5374 })
5375 .unzip();
5376 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5378 exprs.into_element()
5379 } else {
5380 HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5381 };
5382 if let Some(has_exists_column) = has_exists_column {
5383 Ok(HirScalarExpr::if_then_else(
5384 HirScalarExpr::unnamed_column(has_exists_column)
5385 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5386 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5387 expr,
5388 ))
5389 } else {
5390 Ok(expr)
5391 }
5392 }
5393 }
5394}
5395
5396fn plan_op(
5397 ecx: &ExprContext,
5398 op: &str,
5399 expr1: &Expr<Aug>,
5400 expr2: Option<&Expr<Aug>>,
5401) -> Result<HirScalarExpr, PlanError> {
5402 let impls = func::resolve_op(op)?;
5403 let args = match expr2 {
5404 None => plan_exprs(ecx, &[expr1])?,
5405 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5406 };
5407 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5408}
5409
5410fn plan_function<'a>(
5411 ecx: &ExprContext,
5412 f @ Function {
5413 name,
5414 args,
5415 filter,
5416 over,
5417 distinct,
5418 }: &'a Function<Aug>,
5419) -> Result<HirScalarExpr, PlanError> {
5420 let impls = match resolve_func(ecx, name, args)? {
5421 Func::Table(_) => {
5422 sql_bail!(
5423 "table functions are not allowed in {} (function {})",
5424 ecx.name,
5425 name
5426 );
5427 }
5428 Func::Scalar(impls) => {
5429 if over.is_some() {
5430 sql_bail!(
5431 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5432 );
5433 }
5434 impls
5435 }
5436 Func::ScalarWindow(impls) => {
5437 let (
5438 ignore_nulls,
5439 order_by_exprs,
5440 col_orders,
5441 _window_frame,
5442 partition_by,
5443 scalar_args,
5444 ) = plan_window_function_non_aggr(ecx, f)?;
5445
5446 if !scalar_args.is_empty() {
5450 if let ResolvedItemName::Item {
5451 full_name: FullItemName { item, .. },
5452 ..
5453 } = name
5454 {
5455 sql_bail!(
5456 "function {} has 0 parameters, but was called with {}",
5457 item,
5458 scalar_args.len()
5459 );
5460 }
5461 }
5462
5463 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5468
5469 if ignore_nulls {
5470 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5473 }
5474
5475 return Ok(HirScalarExpr::windowing(WindowExpr {
5476 func: WindowExprType::Scalar(ScalarWindowExpr {
5477 func,
5478 order_by: col_orders,
5479 }),
5480 partition_by,
5481 order_by: order_by_exprs,
5482 }));
5483 }
5484 Func::ValueWindow(impls) => {
5485 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5486 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5487 window_plan;
5488
5489 let (args_encoded, func) =
5490 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5491
5492 if ignore_nulls {
5493 match func {
5494 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5495 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5496 }
5497 }
5498
5499 return Ok(HirScalarExpr::windowing(WindowExpr {
5500 func: WindowExprType::Value(ValueWindowExpr {
5501 func,
5502 args: Box::new(args_encoded),
5503 order_by: col_orders,
5504 window_frame,
5505 ignore_nulls, }),
5507 partition_by,
5508 order_by: order_by_exprs,
5509 }));
5510 }
5511 Func::Aggregate(_) => {
5512 if f.over.is_none() {
5513 if ecx.allow_aggregates {
5515 sql_bail!(
5518 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5519 name,
5520 );
5521 } else {
5522 sql_bail!(
5525 "aggregate functions are not allowed in {} (function {})",
5526 ecx.name,
5527 name
5528 );
5529 }
5530 } else {
5531 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5532 plan_window_function_common(ecx, &f.name, &f.over)?;
5533
5534 match (&window_frame.start_bound, &window_frame.end_bound) {
5536 (
5537 mz_expr::WindowFrameBound::UnboundedPreceding,
5538 mz_expr::WindowFrameBound::OffsetPreceding(..),
5539 )
5540 | (
5541 mz_expr::WindowFrameBound::UnboundedPreceding,
5542 mz_expr::WindowFrameBound::OffsetFollowing(..),
5543 )
5544 | (
5545 mz_expr::WindowFrameBound::OffsetPreceding(..),
5546 mz_expr::WindowFrameBound::UnboundedFollowing,
5547 )
5548 | (
5549 mz_expr::WindowFrameBound::OffsetFollowing(..),
5550 mz_expr::WindowFrameBound::UnboundedFollowing,
5551 ) => bail_unsupported!("mixed unbounded - offset frames"),
5552 (_, _) => {} }
5554
5555 if ignore_nulls {
5556 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5560 }
5561
5562 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5563
5564 if aggregate_expr.distinct {
5565 bail_unsupported!("DISTINCT in window aggregates");
5567 }
5568
5569 return Ok(HirScalarExpr::windowing(WindowExpr {
5570 func: WindowExprType::Aggregate(AggregateWindowExpr {
5571 aggregate_expr,
5572 order_by: col_orders,
5573 window_frame,
5574 }),
5575 partition_by,
5576 order_by: order_by_exprs,
5577 }));
5578 }
5579 }
5580 };
5581
5582 if over.is_some() {
5583 bail_internal!("OVER clause should have been handled by the window function path above");
5584 }
5585
5586 if *distinct {
5587 sql_bail!(
5588 "DISTINCT specified, but {} is not an aggregate function",
5589 humanize_or_debug(ecx.qcx.scx, name)
5590 );
5591 }
5592 if filter.is_some() {
5593 sql_bail!(
5594 "FILTER specified, but {} is not an aggregate function",
5595 humanize_or_debug(ecx.qcx.scx, name)
5596 );
5597 }
5598
5599 let scalar_args = match &args {
5600 FunctionArgs::Star => {
5601 sql_bail!(
5602 "* argument is invalid with non-aggregate function {}",
5603 humanize_or_debug(ecx.qcx.scx, name)
5604 )
5605 }
5606 FunctionArgs::Args { args, order_by } => {
5607 if !order_by.is_empty() {
5608 sql_bail!(
5609 "ORDER BY specified, but {} is not an aggregate function",
5610 humanize_or_debug(ecx.qcx.scx, name)
5611 );
5612 }
5613 plan_exprs(ecx, args)?
5614 }
5615 };
5616
5617 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5618}
5619
5620pub const IGNORE_NULLS_ERROR_MSG: &str =
5621 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5622
5623pub fn resolve_func(
5627 ecx: &ExprContext,
5628 name: &ResolvedItemName,
5629 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5630) -> Result<&'static Func, PlanError> {
5631 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5632 if let Ok(f) = i.func() {
5633 return Ok(f);
5634 }
5635 }
5636
5637 let cexprs = match args {
5640 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5641 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5642 if !order_by.is_empty() {
5643 sql_bail!(
5644 "ORDER BY specified, but {} is not an aggregate function",
5645 name
5646 );
5647 }
5648 plan_exprs(ecx, args)?
5649 }
5650 };
5651
5652 let arg_types: Vec<_> = cexprs
5653 .into_iter()
5654 .map(|ty| match ecx.scalar_type(&ty) {
5655 CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5656 CoercibleScalarType::Record(_) => "record".to_string(),
5657 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5658 })
5659 .collect();
5660
5661 Err(PlanError::UnknownFunction {
5662 name: name.to_string(),
5663 arg_types,
5664 })
5665}
5666
5667fn plan_is_expr<'a>(
5668 ecx: &ExprContext,
5669 expr: &'a Expr<Aug>,
5670 construct: &IsExprConstruct<Aug>,
5671 not: bool,
5672) -> Result<HirScalarExpr, PlanError> {
5673 let expr_hir = plan_expr(ecx, expr)?;
5674
5675 let mut result = match construct {
5676 IsExprConstruct::Null => {
5677 expr_hir.type_as_any(ecx)?.call_is_null()
5682 }
5683 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5684 IsExprConstruct::True => expr_hir
5685 .type_as(ecx, &SqlScalarType::Bool)?
5686 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5687 IsExprConstruct::False => expr_hir
5688 .type_as(ecx, &SqlScalarType::Bool)?
5689 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5690 IsExprConstruct::DistinctFrom(expr2) => {
5691 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5702 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5703
5704 let expr1_hir = expr_hir.type_as_any(ecx)?;
5705 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5706
5707 let term1 = HirScalarExpr::variadic_or(vec![
5708 ne_hir,
5709 expr1_hir.clone().call_is_null(),
5710 expr2_hir.clone().call_is_null(),
5711 ]);
5712 let term2 = HirScalarExpr::variadic_or(vec![
5713 expr1_hir.call_is_null().not(),
5714 expr2_hir.call_is_null().not(),
5715 ]);
5716 term1.and(term2)
5717 }
5718 };
5719 if not {
5720 result = result.not();
5721 }
5722 Ok(result)
5723}
5724
5725fn plan_case<'a>(
5726 ecx: &ExprContext,
5727 operand: &'a Option<Box<Expr<Aug>>>,
5728 conditions: &'a [Expr<Aug>],
5729 results: &'a [Expr<Aug>],
5730 else_result: &'a Option<Box<Expr<Aug>>>,
5731) -> Result<HirScalarExpr, PlanError> {
5732 let mut cond_exprs = Vec::new();
5733 let mut result_exprs = Vec::new();
5734 for (c, r) in conditions.iter().zip_eq(results) {
5735 let c = match operand {
5736 Some(operand) => operand.clone().equals(c.clone()),
5737 None => c.clone(),
5738 };
5739 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5740 cond_exprs.push(cexpr);
5741 result_exprs.push(r);
5742 }
5743 result_exprs.push(match else_result {
5744 Some(else_result) => else_result,
5745 None => &Expr::Value(Value::Null),
5746 });
5747 let mut result_exprs = coerce_homogeneous_exprs(
5748 &ecx.with_name("CASE"),
5749 plan_exprs(ecx, &result_exprs)?,
5750 None,
5751 )?;
5752 let mut expr = result_exprs.pop().unwrap();
5753 assert_eq!(cond_exprs.len(), result_exprs.len());
5754 for (cexpr, rexpr) in cond_exprs
5755 .into_iter()
5756 .rev()
5757 .zip_eq(result_exprs.into_iter().rev())
5758 {
5759 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5760 }
5761 Ok(expr)
5762}
5763
5764fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5765 let (datum, scalar_type) = match l {
5766 Value::Number(s) => {
5767 let d = strconv::parse_numeric(s.as_str())?;
5768 if !s.contains(&['E', '.'][..]) {
5769 if let Ok(n) = d.0.try_into() {
5771 (Datum::Int32(n), SqlScalarType::Int32)
5772 } else if let Ok(n) = d.0.try_into() {
5773 (Datum::Int64(n), SqlScalarType::Int64)
5774 } else {
5775 (
5776 Datum::Numeric(d),
5777 SqlScalarType::Numeric { max_scale: None },
5778 )
5779 }
5780 } else {
5781 (
5782 Datum::Numeric(d),
5783 SqlScalarType::Numeric { max_scale: None },
5784 )
5785 }
5786 }
5787 Value::HexString(_) => bail_unsupported!("hex string literals"),
5788 Value::Boolean(b) => match b {
5789 false => (Datum::False, SqlScalarType::Bool),
5790 true => (Datum::True, SqlScalarType::Bool),
5791 },
5792 Value::Interval(i) => {
5793 let i = literal::plan_interval(i)?;
5794 (Datum::Interval(i), SqlScalarType::Interval)
5795 }
5796 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5797 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5798 };
5799 let expr = HirScalarExpr::literal(datum, scalar_type);
5800 Ok(expr.into())
5801}
5802
5803fn plan_window_function_non_aggr<'a>(
5806 ecx: &ExprContext,
5807 Function {
5808 name,
5809 args,
5810 filter,
5811 over,
5812 distinct,
5813 }: &'a Function<Aug>,
5814) -> Result<
5815 (
5816 bool,
5817 Vec<HirScalarExpr>,
5818 Vec<ColumnOrder>,
5819 mz_expr::WindowFrame,
5820 Vec<HirScalarExpr>,
5821 Vec<CoercibleScalarExpr>,
5822 ),
5823 PlanError,
5824> {
5825 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5826 plan_window_function_common(ecx, name, over)?;
5827
5828 if *distinct {
5829 sql_bail!(
5830 "DISTINCT specified, but {} is not an aggregate function",
5831 name
5832 );
5833 }
5834
5835 if filter.is_some() {
5836 bail_unsupported!("FILTER in non-aggregate window functions");
5837 }
5838
5839 let scalar_args = match &args {
5840 FunctionArgs::Star => {
5841 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5842 }
5843 FunctionArgs::Args { args, order_by } => {
5844 if !order_by.is_empty() {
5845 sql_bail!(
5846 "ORDER BY specified, but {} is not an aggregate function",
5847 name
5848 );
5849 }
5850 plan_exprs(ecx, args)?
5851 }
5852 };
5853
5854 Ok((
5855 ignore_nulls,
5856 order_by_exprs,
5857 col_orders,
5858 window_frame,
5859 partition,
5860 scalar_args,
5861 ))
5862}
5863
5864fn plan_window_function_common(
5866 ecx: &ExprContext,
5867 name: &<Aug as AstInfo>::ItemName,
5868 over: &Option<WindowSpec<Aug>>,
5869) -> Result<
5870 (
5871 bool,
5872 Vec<HirScalarExpr>,
5873 Vec<ColumnOrder>,
5874 mz_expr::WindowFrame,
5875 Vec<HirScalarExpr>,
5876 ),
5877 PlanError,
5878> {
5879 if !ecx.allow_windows {
5880 sql_bail!(
5881 "window functions are not allowed in {} (function {})",
5882 ecx.name,
5883 name
5884 );
5885 }
5886
5887 let window_spec = match over.as_ref() {
5888 Some(over) => over,
5889 None => sql_bail!("window function {} requires an OVER clause", name),
5890 };
5891 if window_spec.ignore_nulls && window_spec.respect_nulls {
5892 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5893 }
5894 let window_frame = match window_spec.window_frame.as_ref() {
5895 Some(frame) => plan_window_frame(frame)?,
5896 None => mz_expr::WindowFrame::default(),
5897 };
5898 let mut partition = Vec::new();
5899 for expr in &window_spec.partition_by {
5900 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5901 }
5902
5903 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5904
5905 Ok((
5906 window_spec.ignore_nulls,
5907 order_by_exprs,
5908 col_orders,
5909 window_frame,
5910 partition,
5911 ))
5912}
5913
5914fn plan_window_frame(
5915 WindowFrame {
5916 units,
5917 start_bound,
5918 end_bound,
5919 }: &WindowFrame,
5920) -> Result<mz_expr::WindowFrame, PlanError> {
5921 use mz_expr::WindowFrameBound::*;
5922 let units = window_frame_unit_ast_to_expr(units)?;
5923 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5924 let end_bound = end_bound
5925 .as_ref()
5926 .map(window_frame_bound_ast_to_expr)
5927 .unwrap_or(CurrentRow);
5928
5929 match (&start_bound, &end_bound) {
5931 (UnboundedFollowing, _) => {
5933 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5934 }
5935 (_, UnboundedPreceding) => {
5937 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5938 }
5939 (CurrentRow, OffsetPreceding(_)) => {
5941 sql_bail!("frame starting from current row cannot have preceding rows")
5942 }
5943 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5944 sql_bail!("frame starting from following row cannot have preceding rows")
5945 }
5946 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5949 if *o1 > 1000000 || *o2 > 1000000 {
5953 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5954 }
5955 }
5956 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5957 if *o1 > 1000000 || *o2 > 1000000 {
5958 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5959 }
5960 }
5961 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5962 if *o1 > 1000000 || *o2 > 1000000 {
5963 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5964 }
5965 }
5966 (OffsetPreceding(o), CurrentRow) => {
5967 if *o > 1000000 {
5968 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5969 }
5970 }
5971 (CurrentRow, OffsetFollowing(o)) => {
5972 if *o > 1000000 {
5973 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5974 }
5975 }
5976 (_, _) => (),
5978 }
5979
5980 if units == mz_expr::WindowFrameUnits::Range
5983 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5984 {
5985 bail_unsupported!("RANGE in non-default window frames")
5986 }
5987
5988 let frame = mz_expr::WindowFrame {
5989 units,
5990 start_bound,
5991 end_bound,
5992 };
5993 Ok(frame)
5994}
5995
5996fn window_frame_unit_ast_to_expr(
5997 unit: &WindowFrameUnits,
5998) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5999 match unit {
6000 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6001 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6002 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6003 }
6004}
6005
6006fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6007 match bound {
6008 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6009 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6010 WindowFrameBound::Preceding(Some(offset)) => {
6011 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6012 }
6013 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6014 WindowFrameBound::Following(Some(offset)) => {
6015 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6016 }
6017 }
6018}
6019
6020pub fn scalar_type_from_sql(
6021 scx: &StatementContext,
6022 data_type: &ResolvedDataType,
6023) -> Result<SqlScalarType, PlanError> {
6024 match data_type {
6025 ResolvedDataType::AnonymousList(elem_type) => {
6026 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6027 if matches!(elem_type, SqlScalarType::Char { .. }) {
6028 bail_unsupported!("char list");
6029 }
6030 Ok(SqlScalarType::List {
6031 element_type: Box::new(elem_type),
6032 custom_id: None,
6033 })
6034 }
6035 ResolvedDataType::AnonymousMap {
6036 key_type,
6037 value_type,
6038 } => {
6039 match scalar_type_from_sql(scx, key_type)? {
6040 SqlScalarType::String => {}
6041 other => sql_bail!(
6042 "map key type must be {}, got {}",
6043 scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6044 scx.humanize_sql_scalar_type(&other, false)
6045 ),
6046 }
6047 Ok(SqlScalarType::Map {
6048 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6049 custom_id: None,
6050 })
6051 }
6052 ResolvedDataType::Named { id, modifiers, .. } => {
6053 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6054 }
6055 ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6056 }
6057}
6058
6059pub fn scalar_type_from_catalog(
6060 catalog: &dyn SessionCatalog,
6061 id: CatalogItemId,
6062 modifiers: &[i64],
6063) -> Result<SqlScalarType, PlanError> {
6064 let entry = catalog.get_item(&id);
6065 let type_details = match entry.type_details() {
6066 Some(type_details) => type_details,
6067 None => {
6068 sql_bail!(
6071 "internal error: {} does not refer to a type",
6072 catalog.resolve_full_name(entry.name()).to_string().quoted()
6073 );
6074 }
6075 };
6076 match &type_details.typ {
6077 CatalogType::Numeric => {
6078 let mut modifiers = modifiers.iter().fuse();
6079 let precision = match modifiers.next() {
6080 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6081 sql_bail!(
6082 "precision for type numeric must be between 1 and {}",
6083 NUMERIC_DATUM_MAX_PRECISION,
6084 );
6085 }
6086 Some(p) => Some(*p),
6087 None => None,
6088 };
6089 let scale = match modifiers.next() {
6090 Some(scale) => {
6091 if let Some(precision) = precision {
6092 if *scale > precision {
6093 sql_bail!(
6094 "scale for type numeric must be between 0 and precision {}",
6095 precision
6096 );
6097 }
6098 }
6099 Some(NumericMaxScale::try_from(*scale)?)
6100 }
6101 None => None,
6102 };
6103 if modifiers.next().is_some() {
6104 sql_bail!("type numeric supports at most two type modifiers");
6105 }
6106 Ok(SqlScalarType::Numeric { max_scale: scale })
6107 }
6108 CatalogType::Char => {
6109 let mut modifiers = modifiers.iter().fuse();
6110 let length = match modifiers.next() {
6111 Some(l) => Some(CharLength::try_from(*l)?),
6112 None => Some(CharLength::ONE),
6113 };
6114 if modifiers.next().is_some() {
6115 sql_bail!("type character supports at most one type modifier");
6116 }
6117 Ok(SqlScalarType::Char { length })
6118 }
6119 CatalogType::VarChar => {
6120 let mut modifiers = modifiers.iter().fuse();
6121 let length = match modifiers.next() {
6122 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6123 None => None,
6124 };
6125 if modifiers.next().is_some() {
6126 sql_bail!("type character varying supports at most one type modifier");
6127 }
6128 Ok(SqlScalarType::VarChar { max_length: length })
6129 }
6130 CatalogType::Timestamp => {
6131 let mut modifiers = modifiers.iter().fuse();
6132 let precision = match modifiers.next() {
6133 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6134 None => None,
6135 };
6136 if modifiers.next().is_some() {
6137 sql_bail!("type timestamp supports at most one type modifier");
6138 }
6139 Ok(SqlScalarType::Timestamp { precision })
6140 }
6141 CatalogType::TimestampTz => {
6142 let mut modifiers = modifiers.iter().fuse();
6143 let precision = match modifiers.next() {
6144 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6145 None => None,
6146 };
6147 if modifiers.next().is_some() {
6148 sql_bail!("type timestamp with time zone supports at most one type modifier");
6149 }
6150 Ok(SqlScalarType::TimestampTz { precision })
6151 }
6152 t => {
6153 if !modifiers.is_empty() {
6154 sql_bail!(
6155 "{} does not support type modifiers",
6156 catalog.resolve_full_name(entry.name()).to_string()
6157 );
6158 }
6159 match t {
6160 CatalogType::Array {
6161 element_reference: element_id,
6162 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6163 catalog,
6164 *element_id,
6165 modifiers,
6166 )?))),
6167 CatalogType::List {
6168 element_reference: element_id,
6169 element_modifiers,
6170 } => Ok(SqlScalarType::List {
6171 element_type: Box::new(scalar_type_from_catalog(
6172 catalog,
6173 *element_id,
6174 element_modifiers,
6175 )?),
6176 custom_id: Some(id),
6177 }),
6178 CatalogType::Map {
6179 key_reference: _,
6180 key_modifiers: _,
6181 value_reference: value_id,
6182 value_modifiers,
6183 } => Ok(SqlScalarType::Map {
6184 value_type: Box::new(scalar_type_from_catalog(
6185 catalog,
6186 *value_id,
6187 value_modifiers,
6188 )?),
6189 custom_id: Some(id),
6190 }),
6191 CatalogType::Range {
6192 element_reference: element_id,
6193 } => Ok(SqlScalarType::Range {
6194 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6195 }),
6196 CatalogType::Record { fields } => {
6197 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6198 .iter()
6199 .map(|f| {
6200 let scalar_type = scalar_type_from_catalog(
6201 catalog,
6202 f.type_reference,
6203 &f.type_modifiers,
6204 )?;
6205 Ok((
6206 f.name.clone(),
6207 SqlColumnType {
6208 scalar_type,
6209 nullable: true,
6210 },
6211 ))
6212 })
6213 .collect::<Result<Box<_>, PlanError>>()?;
6214 Ok(SqlScalarType::Record {
6215 fields: scalars,
6216 custom_id: Some(id),
6217 })
6218 }
6219 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6220 CatalogType::Bool => Ok(SqlScalarType::Bool),
6221 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6222 CatalogType::Date => Ok(SqlScalarType::Date),
6223 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6224 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6225 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6226 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6227 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6228 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6229 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6230 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6231 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6232 CatalogType::Interval => Ok(SqlScalarType::Interval),
6233 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6234 CatalogType::Oid => Ok(SqlScalarType::Oid),
6235 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6236 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6237 CatalogType::Pseudo => {
6238 sql_bail!(
6239 "cannot reference pseudo type {}",
6240 catalog.resolve_full_name(entry.name()).to_string()
6241 )
6242 }
6243 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6244 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6245 CatalogType::RegType => Ok(SqlScalarType::RegType),
6246 CatalogType::String => Ok(SqlScalarType::String),
6247 CatalogType::Time => Ok(SqlScalarType::Time),
6248 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6249 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6250 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6251 CatalogType::Numeric => unreachable!("handled above"),
6252 CatalogType::Char => unreachable!("handled above"),
6253 CatalogType::VarChar => unreachable!("handled above"),
6254 CatalogType::Timestamp => unreachable!("handled above"),
6255 CatalogType::TimestampTz => unreachable!("handled above"),
6256 }
6257 }
6258 }
6259}
6260
6261struct AggregateTableFuncVisitor<'a> {
6264 scx: &'a StatementContext<'a>,
6265 aggs: Vec<Function<Aug>>,
6266 within_aggregate: bool,
6267 tables: BTreeMap<Function<Aug>, String>,
6268 table_disallowed_context: Vec<&'static str>,
6269 in_select_item: bool,
6270 id_gen: IdGen,
6271 err: Option<PlanError>,
6272}
6273
6274impl<'a> AggregateTableFuncVisitor<'a> {
6275 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6276 AggregateTableFuncVisitor {
6277 scx,
6278 aggs: Vec::new(),
6279 within_aggregate: false,
6280 tables: BTreeMap::new(),
6281 table_disallowed_context: Vec::new(),
6282 in_select_item: false,
6283 id_gen: Default::default(),
6284 err: None,
6285 }
6286 }
6287
6288 fn into_result(
6289 self,
6290 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6291 match self.err {
6292 Some(err) => Err(err),
6293 None => {
6294 let mut seen = BTreeSet::new();
6297 let aggs = self
6298 .aggs
6299 .into_iter()
6300 .filter(move |agg| seen.insert(agg.clone()))
6301 .collect();
6302 Ok((aggs, self.tables))
6303 }
6304 }
6305 }
6306}
6307
6308impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6309 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6310 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6311 Ok(i) => i,
6312 Err(_) => return,
6314 };
6315
6316 match item.func() {
6317 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6320 if self.within_aggregate {
6321 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6322 return;
6323 }
6324 self.aggs.push(func.clone());
6325 let Function {
6326 name: _,
6327 args,
6328 filter,
6329 over: _,
6330 distinct: _,
6331 } = func;
6332 if let Some(filter) = filter {
6333 self.visit_expr_mut(filter);
6334 }
6335 let old_within_aggregate = self.within_aggregate;
6336 self.within_aggregate = true;
6337 self.table_disallowed_context
6338 .push("aggregate function calls");
6339
6340 self.visit_function_args_mut(args);
6341
6342 self.within_aggregate = old_within_aggregate;
6343 self.table_disallowed_context.pop();
6344 }
6345 Ok(Func::Table { .. }) => {
6346 self.table_disallowed_context.push("other table functions");
6347 visit_mut::visit_function_mut(self, func);
6348 self.table_disallowed_context.pop();
6349 }
6350 _ => visit_mut::visit_function_mut(self, func),
6351 }
6352 }
6353
6354 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6355 }
6357
6358 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6359 let (disallowed_context, func) = match expr {
6360 Expr::Case { .. } => (Some("CASE"), None),
6361 Expr::HomogenizingFunction {
6362 function: HomogenizingFunction::Coalesce,
6363 ..
6364 } => (Some("COALESCE"), None),
6365 Expr::Function(func) if self.in_select_item => {
6366 let mut table_func = None;
6369 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6370 if let Ok(Func::Table { .. }) = item.func() {
6371 if let Some(context) = self.table_disallowed_context.last() {
6372 self.err = Some(sql_err!(
6373 "table functions are not allowed in {} (function {})",
6374 context,
6375 func.name
6376 ));
6377 return;
6378 }
6379 table_func = Some(func.clone());
6380 }
6381 }
6382 (None, table_func)
6385 }
6386 _ => (None, None),
6387 };
6388 if let Some(func) = func {
6389 visit_mut::visit_expr_mut(self, expr);
6391 if let Function {
6393 name: _,
6394 args: _,
6395 filter: None,
6396 over: None,
6397 distinct: false,
6398 } = &func
6399 {
6400 let unique_id = self.id_gen.allocate_id();
6402 let id = self
6403 .tables
6404 .entry(func)
6405 .or_insert_with(|| format!("table_func_{unique_id}"));
6406 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6409 }
6410 }
6411 if let Some(context) = disallowed_context {
6412 self.table_disallowed_context.push(context);
6413 }
6414
6415 visit_mut::visit_expr_mut(self, expr);
6416
6417 if disallowed_context.is_some() {
6418 self.table_disallowed_context.pop();
6419 }
6420 }
6421
6422 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6423 let old = self.in_select_item;
6424 self.in_select_item = true;
6425 visit_mut::visit_select_item_mut(self, si);
6426 self.in_select_item = old;
6427 }
6428}
6429
6430#[derive(Default)]
6431struct WindowFuncCollector {
6432 window_funcs: Vec<Expr<Aug>>,
6433}
6434
6435impl WindowFuncCollector {
6436 fn into_result(self) -> Vec<Expr<Aug>> {
6437 let mut seen = BTreeSet::new();
6439 let window_funcs_dedupped = self
6440 .window_funcs
6441 .into_iter()
6442 .filter(move |expr| seen.insert(expr.clone()))
6443 .rev()
6446 .collect();
6447 window_funcs_dedupped
6448 }
6449}
6450
6451impl Visit<'_, Aug> for WindowFuncCollector {
6452 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6453 match expr {
6454 Expr::Function(func) => {
6455 if func.over.is_some() {
6456 self.window_funcs.push(expr.clone());
6457 }
6458 }
6459 _ => (),
6460 }
6461 visit::visit_expr(self, expr);
6462 }
6463
6464 fn visit_query(&mut self, _query: &Query<Aug>) {
6465 }
6467}
6468
6469#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6471pub enum QueryLifetime {
6472 OneShot,
6474 Index,
6476 MaterializedView,
6478 Subscribe,
6480 View,
6482 Source,
6484}
6485
6486impl QueryLifetime {
6487 pub fn is_one_shot(&self) -> bool {
6491 let result = match self {
6492 QueryLifetime::OneShot => true,
6493 QueryLifetime::Index => false,
6494 QueryLifetime::MaterializedView => false,
6495 QueryLifetime::Subscribe => false,
6496 QueryLifetime::View => false,
6497 QueryLifetime::Source => false,
6498 };
6499 assert_eq!(!result, self.is_maintained());
6500 result
6501 }
6502
6503 pub fn is_maintained(&self) -> bool {
6506 match self {
6507 QueryLifetime::OneShot => false,
6508 QueryLifetime::Index => true,
6509 QueryLifetime::MaterializedView => true,
6510 QueryLifetime::Subscribe => true,
6511 QueryLifetime::View => true,
6512 QueryLifetime::Source => true,
6513 }
6514 }
6515
6516 pub fn allow_show(&self) -> bool {
6518 match self {
6519 QueryLifetime::OneShot => true,
6520 QueryLifetime::Index => false,
6521 QueryLifetime::MaterializedView => false,
6522 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6524 QueryLifetime::Source => false,
6525 }
6526 }
6527}
6528
6529#[derive(Debug, Clone)]
6531pub struct CteDesc {
6532 pub name: String,
6533 pub desc: RelationDesc,
6534}
6535
6536#[derive(Debug, Clone)]
6538pub struct QueryContext<'a> {
6539 pub scx: &'a StatementContext<'a>,
6541 pub lifetime: QueryLifetime,
6543 pub outer_scopes: Vec<Scope>,
6545 pub outer_relation_types: Vec<SqlRelationType>,
6547 pub ctes: BTreeMap<LocalId, CteDesc>,
6549 pub name_manager: Rc<RefCell<NameManager>>,
6551 pub recursion_guard: RecursionGuard,
6552}
6553
6554impl CheckedRecursion for QueryContext<'_> {
6555 fn recursion_guard(&self) -> &RecursionGuard {
6556 &self.recursion_guard
6557 }
6558}
6559
6560impl<'a> QueryContext<'a> {
6561 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6562 QueryContext {
6563 scx,
6564 lifetime,
6565 outer_scopes: vec![],
6566 outer_relation_types: vec![],
6567 ctes: BTreeMap::new(),
6568 name_manager: Rc::new(RefCell::new(NameManager::new())),
6569 recursion_guard: RecursionGuard::with_limit(1024), }
6571 }
6572
6573 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6574 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6575 }
6576
6577 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6580 let ctes = self.ctes.clone();
6581 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6582 let outer_relation_types = iter::once(relation_type)
6583 .chain(self.outer_relation_types.clone())
6584 .collect();
6585 let name_manager = Rc::clone(&self.name_manager);
6587
6588 QueryContext {
6589 scx: self.scx,
6590 lifetime: self.lifetime,
6591 outer_scopes,
6592 outer_relation_types,
6593 ctes,
6594 name_manager,
6595 recursion_guard: self.recursion_guard.clone(),
6596 }
6597 }
6598
6599 fn empty_derived_context(&self) -> QueryContext<'a> {
6601 let scope = Scope::empty();
6602 let ty = SqlRelationType::empty();
6603 self.derived_context(scope, ty)
6604 }
6605
6606 pub fn resolve_table_name(
6609 &self,
6610 object: ResolvedItemName,
6611 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6612 match object {
6613 ResolvedItemName::Item {
6614 id,
6615 full_name,
6616 version,
6617 ..
6618 } => {
6619 let item = self.scx.get_item(&id).at_version(version);
6620 let desc = match item.relation_desc() {
6621 Some(desc) => desc.clone(),
6622 None => {
6623 return Err(PlanError::InvalidDependency {
6624 name: full_name.to_string(),
6625 item_type: item.item_type().to_string(),
6626 });
6627 }
6628 };
6629 let expr = HirRelationExpr::Get {
6630 id: Id::Global(item.global_id()),
6631 typ: desc.typ().clone(),
6632 };
6633
6634 let name = full_name.into();
6635 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6636
6637 Ok((expr, scope))
6638 }
6639 ResolvedItemName::Cte { id, name } => {
6640 let name = name.into();
6641 let cte = self.ctes.get(&id).unwrap();
6642 let expr = HirRelationExpr::Get {
6643 id: Id::Local(id),
6644 typ: cte.desc.typ().clone(),
6645 };
6646
6647 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6648
6649 Ok((expr, scope))
6650 }
6651 ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6652 }
6653 }
6654
6655 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6658 self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6659 }
6660}
6661
6662#[derive(Debug, Clone)]
6664pub struct ExprContext<'a> {
6665 pub qcx: &'a QueryContext<'a>,
6666 pub name: &'a str,
6668 pub scope: &'a Scope,
6671 pub relation_type: &'a SqlRelationType,
6674 pub allow_aggregates: bool,
6676 pub allow_subqueries: bool,
6678 pub allow_parameters: bool,
6680 pub allow_windows: bool,
6682}
6683
6684impl CheckedRecursion for ExprContext<'_> {
6685 fn recursion_guard(&self) -> &RecursionGuard {
6686 &self.qcx.recursion_guard
6687 }
6688}
6689
6690impl<'a> ExprContext<'a> {
6691 pub fn catalog(&self) -> &dyn SessionCatalog {
6692 self.qcx.scx.catalog
6693 }
6694
6695 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6696 let mut ecx = self.clone();
6697 ecx.name = name;
6698 ecx
6699 }
6700
6701 pub fn column_type<E>(&self, expr: &E) -> E::Type
6702 where
6703 E: AbstractExpr,
6704 {
6705 expr.typ(
6706 &self.qcx.outer_relation_types,
6707 self.relation_type,
6708 &self.qcx.scx.param_types.borrow(),
6709 )
6710 }
6711
6712 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6713 where
6714 E: AbstractExpr,
6715 {
6716 self.column_type(expr).scalar_type()
6717 }
6718
6719 fn derived_query_context(&self) -> QueryContext<'_> {
6720 let mut scope = self.scope.clone();
6721 scope.lateral_barrier = true;
6722 self.qcx.derived_context(scope, self.relation_type.clone())
6723 }
6724
6725 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6726 self.qcx.scx.require_feature_flag(flag)
6727 }
6728
6729 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6730 &self.qcx.scx.param_types
6731 }
6732
6733 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6736 self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6737 }
6738
6739 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6740 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6741 }
6742}
6743
6744#[derive(Debug, Clone)]
6750pub struct NameManager(BTreeSet<Arc<str>>);
6751
6752impl NameManager {
6753 pub fn new() -> Self {
6755 Self(BTreeSet::new())
6756 }
6757
6758 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6761 let s = s.as_ref();
6762 if let Some(interned) = self.0.get(s) {
6763 Arc::clone(interned)
6764 } else {
6765 let interned: Arc<str> = Arc::from(s);
6766 self.0.insert(Arc::clone(&interned));
6767 interned
6768 }
6769 }
6770
6771 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6774 self.intern(item.column_name.as_str())
6790 }
6791}
6792
6793#[cfg(test)]
6794mod test {
6795 use super::*;
6796
6797 #[mz_ore::test]
6802 pub fn test_name_manager_string_interning() {
6803 let mut nm = NameManager::new();
6804
6805 let orig_hi = "hi";
6806 let hi = nm.intern(orig_hi);
6807 let hello = nm.intern("hello");
6808
6809 assert_ne!(hi.as_ptr(), hello.as_ptr());
6810
6811 let hi2 = nm.intern("hi");
6813 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6814
6815 let s = format!(
6817 "{}{}",
6818 hi.chars().nth(0).unwrap(),
6819 hi2.chars().nth(1).unwrap()
6820 );
6821 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6823
6824 let hi3 = nm.intern(s);
6825 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6826 }
6827}