1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50 ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51 MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56 TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72 ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73 UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109 literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117 pub expr: E,
118 pub desc: RelationDesc,
119 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120 pub scope: Scope,
121}
122
123#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133 scx: &StatementContext,
134 mut query: Query<Aug>,
135 lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137 transform_ast::transform(scx, &mut query)?;
138 let mut qcx = QueryContext::root(scx, lifetime);
139 let PlannedQuery {
140 mut expr,
141 scope,
142 order_by,
143 limit,
144 offset,
145 project,
146 group_size_hints,
147 } = plan_query(&mut qcx, &query)?;
148
149 let mut finishing = RowSetFinishing {
150 limit,
151 offset,
152 project,
153 order_by,
154 };
155
156 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163 if lifetime.is_maintained() {
164 expr.finish_maintained(&mut finishing, group_size_hints);
165 }
166
167 let typ = qcx.relation_type(&expr);
168 let typ = SqlRelationType::new(
169 finishing
170 .project
171 .iter()
172 .map(|i| typ.column_types[*i].clone())
173 .collect(),
174 );
175 let desc = RelationDesc::new(typ, scope.column_names());
176
177 Ok(PlannedRootQuery {
178 expr,
179 desc,
180 finishing,
181 scope,
182 })
183}
184
185fn try_push_projection_order_by(
196 expr: &mut HirRelationExpr,
197 project: &mut Vec<usize>,
198 order_by: &mut Vec<ColumnOrder>,
199) -> bool {
200 let mut unproject = vec![None; expr.arity()];
201 for (out_i, in_i) in project.iter().copied().enumerate() {
202 unproject[in_i] = Some(out_i);
203 }
204 if order_by
205 .iter()
206 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
207 {
208 let trivial_project = (0..project.len()).collect();
209 *expr = expr.take().project(mem::replace(project, trivial_project));
210 for ob in order_by {
211 ob.column = unproject[ob.column].unwrap();
212 }
213 true
214 } else {
215 false
216 }
217}
218
219pub fn plan_insert_query(
220 scx: &StatementContext,
221 table_name: ResolvedItemName,
222 columns: Vec<Ident>,
223 source: InsertSource<Aug>,
224 returning: Vec<SelectItem<Aug>>,
225) -> Result<
226 (
227 CatalogItemId,
228 HirRelationExpr,
229 PlannedRootQuery<Vec<HirScalarExpr>>,
230 ),
231 PlanError,
232> {
233 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
234 let table = scx.get_item_by_resolved_name(&table_name)?;
235
236 if table.item_type() != CatalogItemType::Table {
238 sql_bail!(
239 "cannot insert into {} '{}'",
240 table.item_type(),
241 table_name.full_name_str()
242 );
243 }
244 let desc = table
245 .relation_desc()
246 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
247 let mut defaults = table
248 .writable_table_details()
249 .ok_or_else(|| {
250 sql_err!(
251 "cannot insert into non-writeable table '{}'",
252 table_name.full_name_str()
253 )
254 })?
255 .to_vec();
256
257 for default in &mut defaults {
258 transform_ast::transform(scx, default)?;
259 }
260
261 if table.id().is_system() {
262 sql_bail!(
263 "cannot insert into system table '{}'",
264 table_name.full_name_str()
265 );
266 }
267
268 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
269
270 let mut source_types = Vec::with_capacity(columns.len());
272 let mut ordering = Vec::with_capacity(columns.len());
273
274 if columns.is_empty() {
275 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
278 ordering.extend(0..desc.arity());
279 } else {
280 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
281 .iter()
282 .enumerate()
283 .map(|(idx, (name, typ))| (name, (idx, typ)))
284 .collect();
285
286 for c in &columns {
287 if let Some((idx, typ)) = column_by_name.get(c) {
288 ordering.push(*idx);
289 source_types.push(&typ.scalar_type);
290 } else {
291 sql_bail!(
292 "column {} of relation {} does not exist",
293 c.quoted(),
294 table_name.full_name_str().quoted()
295 );
296 }
297 }
298 if let Some(dup) = columns.iter().duplicates().next() {
299 sql_bail!("column {} specified more than once", dup.quoted());
300 }
301 };
302
303 let expr = match source {
305 InsertSource::Query(mut query) => {
306 transform_ast::transform(scx, &mut query)?;
307
308 match query {
309 Query {
311 body: SetExpr::Values(Values(values)),
312 ctes,
313 order_by,
314 limit: None,
315 offset: None,
316 } if ctes.is_empty() && order_by.is_empty() => {
317 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
318 plan_values_insert(&qcx, &names, &source_types, &values)?
319 }
320 _ => {
321 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
322 expr
323 }
324 }
325 }
326 InsertSource::DefaultValues => {
327 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
328 }
329 };
330
331 let expr_arity = expr.arity();
332
333 let max_columns = if columns.is_empty() {
336 desc.arity()
337 } else {
338 columns.len()
339 };
340 if expr_arity > max_columns {
341 sql_bail!("INSERT has more expressions than target columns");
342 }
343 if expr_arity < columns.len() {
345 sql_bail!("INSERT has more target columns than expressions");
346 }
347
348 source_types.truncate(expr_arity);
350 ordering.truncate(expr_arity);
351
352 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
355 sql_err!(
356 "column {} is of type {} but expression is of type {}",
357 desc.get_name(ordering[e.column]).quoted(),
358 qcx.humanize_sql_scalar_type(&e.target_type, false),
359 qcx.humanize_sql_scalar_type(&e.source_type, false),
360 )
361 })?;
362
363 let mut map_exprs = vec![];
365 let mut project_key = Vec::with_capacity(desc.arity());
366
367 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
369
370 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
371 for (col_idx, (col_typ, default)) in column_details {
372 if let Some(src_idx) = col_to_source.get(&col_idx) {
373 project_key.push(*src_idx);
374 } else {
375 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
376 project_key.push(expr_arity + map_exprs.len());
377 map_exprs.push(hir);
378 }
379 }
380
381 let returning = {
382 let (scope, typ) = if let ResolvedItemName::Item {
383 full_name,
384 version: _,
385 ..
386 } = table_name
387 {
388 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
389 let typ = desc.typ().clone();
390 (scope, typ)
391 } else {
392 (Scope::empty(), SqlRelationType::empty())
393 };
394 let ecx = &ExprContext {
395 qcx: &qcx,
396 name: "RETURNING clause",
397 scope: &scope,
398 relation_type: &typ,
399 allow_aggregates: false,
400 allow_subqueries: false,
401 allow_parameters: true,
402 allow_windows: false,
403 };
404 let table_func_names = BTreeMap::new();
405 let mut output_columns = vec![];
406 let mut new_exprs = vec![];
407 let mut new_type = SqlRelationType::empty();
408 for mut si in returning {
409 transform_ast::transform(scx, &mut si)?;
410 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
411 let expr = match &select_item {
412 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
413 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
414 };
415 output_columns.push(column_name);
416 let typ = ecx.column_type(&expr);
417 new_type.column_types.push(typ);
418 new_exprs.push(expr);
419 }
420 }
421 let desc = RelationDesc::new(new_type, output_columns);
422 let desc_arity = desc.arity();
423 PlannedRootQuery {
424 expr: new_exprs,
425 desc,
426 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
427 scope,
428 }
429 };
430
431 Ok((
432 table.id(),
433 expr.map(map_exprs).project(project_key),
434 returning,
435 ))
436}
437
438pub fn plan_copy_item(
449 scx: &StatementContext,
450 item_name: ResolvedItemName,
451 columns: Vec<Ident>,
452) -> Result<
453 (
454 CatalogItemId,
455 RelationDesc,
456 Vec<ColumnIndex>,
457 Option<MapFilterProject>,
458 ),
459 PlanError,
460> {
461 let item = scx.get_item_by_resolved_name(&item_name)?;
462 let fullname = scx.catalog.resolve_full_name(item.name());
463 let table_desc = match item.relation_desc() {
464 Some(desc) => desc.into_owned(),
465 None => {
466 return Err(PlanError::InvalidDependency {
467 name: fullname.to_string(),
468 item_type: item.item_type().to_string(),
469 });
470 }
471 };
472 let mut ordering = Vec::with_capacity(columns.len());
473
474 let mfp = if let Some(table_defaults) = item.writable_table_details() {
484 let mut table_defaults = table_defaults.to_vec();
485
486 for default in &mut table_defaults {
487 transform_ast::transform(scx, default)?;
488 }
489
490 let source_column_names: Vec<_> = columns
492 .iter()
493 .cloned()
494 .map(normalize::column_name)
495 .collect();
496
497 let mut default_exprs = Vec::new();
498 let mut project_keys = Vec::with_capacity(table_desc.arity());
499
500 let column_details = table_desc.iter().zip_eq(table_defaults);
503 for ((col_name, col_type), col_default) in column_details {
504 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
505 if let Some(src_idx) = maybe_src_idx {
506 project_keys.push(src_idx);
507 } else {
508 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
511 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
512 project_keys.push(source_column_names.len() + default_exprs.len());
513 default_exprs.push(mir);
514 }
515 }
516
517 let mfp = MapFilterProject::new(source_column_names.len())
518 .map(default_exprs)
519 .project(project_keys);
520 Some(mfp)
521 } else {
522 None
523 };
524
525 let source_desc = if columns.is_empty() {
527 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
528 ordering.extend(indexes);
529
530 table_desc
532 } else {
533 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
534 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
535 .iter_all()
536 .map(|(idx, name, typ)| (name, (*idx, typ)))
537 .collect();
538
539 let mut names = Vec::with_capacity(columns.len());
540 let mut source_types = Vec::with_capacity(columns.len());
541
542 for c in &columns {
543 if let Some((idx, typ)) = column_by_name.get(c) {
544 ordering.push(*idx);
545 source_types.push((*typ).clone());
546 names.push(c.clone());
547 } else {
548 sql_bail!(
549 "column {} of relation {} does not exist",
550 c.quoted(),
551 item_name.full_name_str().quoted()
552 );
553 }
554 }
555 if let Some(dup) = columns.iter().duplicates().next() {
556 sql_bail!("column {} specified more than once", dup.quoted());
557 }
558
559 RelationDesc::new(SqlRelationType::new(source_types), names)
561 };
562
563 Ok((item.id(), source_desc, ordering, mfp))
564}
565
566pub fn plan_copy_from(
570 scx: &StatementContext,
571 table_name: ResolvedItemName,
572 columns: Vec<Ident>,
573) -> Result<
574 (
575 CatalogItemId,
576 RelationDesc,
577 Vec<ColumnIndex>,
578 Option<MapFilterProject>,
579 ),
580 PlanError,
581> {
582 let table = scx.get_item_by_resolved_name(&table_name)?;
583
584 if table.item_type() != CatalogItemType::Table {
586 sql_bail!(
587 "cannot insert into {} '{}'",
588 table.item_type(),
589 table_name.full_name_str()
590 );
591 }
592
593 let _ = table.writable_table_details().ok_or_else(|| {
594 sql_err!(
595 "cannot insert into non-writeable table '{}'",
596 table_name.full_name_str()
597 )
598 })?;
599
600 if table.id().is_system() {
601 sql_bail!(
602 "cannot insert into system table '{}'",
603 table_name.full_name_str()
604 );
605 }
606 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
607
608 Ok((id, desc, ordering, mfp))
609}
610
611pub fn plan_copy_from_rows(
614 pcx: &PlanContext,
615 catalog: &dyn SessionCatalog,
616 target_id: CatalogItemId,
617 target_name: String,
618 columns: Vec<ColumnIndex>,
619 rows: Vec<mz_repr::Row>,
620) -> Result<HirRelationExpr, PlanError> {
621 let scx = StatementContext::new(Some(pcx), catalog);
622
623 let table = catalog
625 .try_get_item(&target_id)
626 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
627 .at_version(RelationVersionSelector::Latest);
628
629 let mut defaults = table
630 .writable_table_details()
631 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
632 .to_vec();
633
634 for default in &mut defaults {
635 transform_ast::transform(&scx, default)?;
636 }
637
638 let desc = table
639 .relation_desc()
640 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
641 let column_types = columns
642 .iter()
643 .map(|x| desc.get_type(x).clone())
644 .map(|mut x| {
645 x.nullable = true;
648 x
649 })
650 .collect();
651 let typ = SqlRelationType::new(column_types);
652 let expr = HirRelationExpr::Constant {
653 rows,
654 typ: typ.clone(),
655 };
656
657 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
663 if columns == default {
664 return Ok(expr);
665 }
666
667 let mut map_exprs = vec![];
669 let mut project_key = Vec::with_capacity(desc.arity());
670
671 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
673
674 let column_details = desc.iter_all().zip_eq(defaults);
675 for ((col_idx, _col_name, col_typ), default) in column_details {
676 if let Some(src_idx) = col_to_source.get(&col_idx) {
677 project_key.push(*src_idx);
678 } else {
679 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
680 project_key.push(typ.arity() + map_exprs.len());
681 map_exprs.push(hir);
682 }
683 }
684
685 Ok(expr.map(map_exprs).project(project_key))
686}
687
688pub struct ReadThenWritePlan {
690 pub id: CatalogItemId,
691 pub selection: HirRelationExpr,
696 pub assignments: BTreeMap<usize, HirScalarExpr>,
698 pub finishing: RowSetFinishing,
699}
700
701pub fn plan_delete_query(
702 scx: &StatementContext,
703 mut delete_stmt: DeleteStatement<Aug>,
704) -> Result<ReadThenWritePlan, PlanError> {
705 transform_ast::transform(scx, &mut delete_stmt)?;
706
707 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
708 plan_mutation_query_inner(
709 qcx,
710 delete_stmt.table_name,
711 delete_stmt.alias,
712 delete_stmt.using,
713 vec![],
714 delete_stmt.selection,
715 )
716}
717
718pub fn plan_update_query(
719 scx: &StatementContext,
720 mut update_stmt: UpdateStatement<Aug>,
721) -> Result<ReadThenWritePlan, PlanError> {
722 transform_ast::transform(scx, &mut update_stmt)?;
723
724 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
725
726 plan_mutation_query_inner(
727 qcx,
728 update_stmt.table_name,
729 update_stmt.alias,
730 vec![],
731 update_stmt.assignments,
732 update_stmt.selection,
733 )
734}
735
736pub fn plan_mutation_query_inner(
737 qcx: QueryContext,
738 table_name: ResolvedItemName,
739 alias: Option<TableAlias>,
740 using: Vec<TableWithJoins<Aug>>,
741 assignments: Vec<Assignment<Aug>>,
742 selection: Option<Expr<Aug>>,
743) -> Result<ReadThenWritePlan, PlanError> {
744 let (id, version) = match table_name {
746 ResolvedItemName::Item { id, version, .. } => (id, version),
747 _ => sql_bail!("cannot mutate non-user table"),
748 };
749
750 let item = qcx.scx.get_item(&id).at_version(version);
752 if item.item_type() != CatalogItemType::Table {
753 sql_bail!(
754 "cannot mutate {} '{}'",
755 item.item_type(),
756 table_name.full_name_str()
757 );
758 }
759 let _ = item.writable_table_details().ok_or_else(|| {
760 sql_err!(
761 "cannot mutate non-writeable table '{}'",
762 table_name.full_name_str()
763 )
764 })?;
765 if id.is_system() {
766 sql_bail!(
767 "cannot mutate system table '{}'",
768 table_name.full_name_str()
769 );
770 }
771
772 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
774 let scope = plan_table_alias(scope, alias.as_ref())?;
775 let desc = item.relation_desc().expect("table has desc");
776 let relation_type = qcx.relation_type(&get);
777
778 if using.is_empty() {
779 if let Some(expr) = selection {
780 let ecx = &ExprContext {
781 qcx: &qcx,
782 name: "WHERE clause",
783 scope: &scope,
784 relation_type: &relation_type,
785 allow_aggregates: false,
786 allow_subqueries: true,
787 allow_parameters: true,
788 allow_windows: false,
789 };
790 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
791 get = get.filter(vec![expr]);
792 }
793 } else {
794 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
795 }
796
797 let mut sets = BTreeMap::new();
798 for Assignment { id, value } in assignments {
799 let name = normalize::column_name(id);
801 match desc.get_by_name(&name) {
802 Some((idx, typ)) => {
803 let ecx = &ExprContext {
804 qcx: &qcx,
805 name: "SET clause",
806 scope: &scope,
807 relation_type: &relation_type,
808 allow_aggregates: false,
809 allow_subqueries: false,
810 allow_parameters: true,
811 allow_windows: false,
812 };
813 let expr = plan_expr(ecx, &value)?.cast_to(
814 ecx,
815 CastContext::Assignment,
816 &typ.scalar_type,
817 )?;
818
819 if sets.insert(idx, expr).is_some() {
820 sql_bail!("column {} set twice", name)
821 }
822 }
823 None => sql_bail!("unknown column {}", name),
824 };
825 }
826
827 let finishing = RowSetFinishing {
828 order_by: vec![],
829 limit: None,
830 offset: 0,
831 project: (0..desc.arity()).collect(),
832 };
833
834 Ok(ReadThenWritePlan {
835 id,
836 selection: get,
837 finishing,
838 assignments: sets,
839 })
840}
841
842fn handle_mutation_using_clause(
854 qcx: &QueryContext,
855 selection: Option<Expr<Aug>>,
856 using: Vec<TableWithJoins<Aug>>,
857 get: HirRelationExpr,
858 outer_scope: Scope,
859) -> Result<HirRelationExpr, PlanError> {
860 let (mut using_rel_expr, using_scope) =
864 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
865 let (left, left_scope) = l;
866 plan_join(
867 qcx,
868 left,
869 left_scope,
870 &Join {
871 relation: TableFactor::NestedJoin {
872 join: Box::new(twj),
873 alias: None,
874 },
875 join_operator: JoinOperator::CrossJoin,
876 },
877 )
878 })?;
879
880 if let Some(expr) = selection {
881 let on = HirScalarExpr::literal_true();
887 let joined = using_rel_expr
888 .clone()
889 .join(get.clone(), on, JoinKind::Inner);
890 let joined_scope = using_scope.product(outer_scope)?;
891 let joined_relation_type = qcx.relation_type(&joined);
892
893 let ecx = &ExprContext {
894 qcx,
895 name: "WHERE clause",
896 scope: &joined_scope,
897 relation_type: &joined_relation_type,
898 allow_aggregates: false,
899 allow_subqueries: true,
900 allow_parameters: true,
901 allow_windows: false,
902 };
903
904 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
906
907 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
911 use mz_expr::visit::Visit;
913 expr.visit_mut_post(&mut |e| {
914 if let HirScalarExpr::Column(c, _name) = e {
915 if c.column >= using_rel_arity {
916 c.level += 1;
917 c.column -= using_rel_arity;
918 };
919 }
920 })?;
921
922 using_rel_expr = using_rel_expr.filter(vec![expr]);
926 } else {
927 let _joined_scope = using_scope.product(outer_scope)?;
930 }
931 Ok(get.filter(vec![using_rel_expr.exists()]))
942}
943
944#[derive(Debug)]
945pub(crate) struct CastRelationError {
946 pub(crate) column: usize,
947 pub(crate) source_type: SqlScalarType,
948 pub(crate) target_type: SqlScalarType,
949}
950
951pub(crate) fn cast_relation<'a, I>(
955 qcx: &QueryContext,
956 ccx: CastContext,
957 expr: HirRelationExpr,
958 target_types: I,
959) -> Result<HirRelationExpr, CastRelationError>
960where
961 I: IntoIterator<Item = &'a SqlScalarType>,
962{
963 let ecx = &ExprContext {
964 qcx,
965 name: "values",
966 scope: &Scope::empty(),
967 relation_type: &qcx.relation_type(&expr),
968 allow_aggregates: false,
969 allow_subqueries: true,
970 allow_parameters: true,
971 allow_windows: false,
972 };
973 let mut map_exprs = vec![];
974 let mut project_key = vec![];
975 for (i, target_typ) in target_types.into_iter().enumerate() {
976 let expr = HirScalarExpr::column(i);
977 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
981 Ok(cast_expr) => {
982 if expr == cast_expr {
983 project_key.push(i);
985 } else {
986 project_key.push(ecx.relation_type.arity() + map_exprs.len());
988 map_exprs.push(cast_expr);
989 }
990 }
991 Err(_) => {
992 return Err(CastRelationError {
993 column: i,
994 source_type: ecx.scalar_type(&expr),
995 target_type: target_typ.clone(),
996 });
997 }
998 }
999 }
1000 Ok(expr.map(map_exprs).project(project_key))
1001}
1002
1003pub fn plan_as_of(
1006 scx: &StatementContext,
1007 as_of: Option<AsOf<Aug>>,
1008) -> Result<QueryWhen, PlanError> {
1009 match as_of {
1010 None => Ok(QueryWhen::Immediately),
1011 Some(as_of) => match as_of {
1012 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1014 },
1015 }
1016}
1017
1018pub fn plan_as_of_or_up_to(
1028 scx: &StatementContext,
1029 mut expr: Expr<Aug>,
1030) -> Result<mz_repr::Timestamp, PlanError> {
1031 let scope = Scope::empty();
1032 let desc = RelationDesc::empty();
1033 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1036 transform_ast::transform(scx, &mut expr)?;
1037 let ecx = &ExprContext {
1038 qcx: &qcx,
1039 name: "AS OF or UP TO",
1040 scope: &scope,
1041 relation_type: desc.typ(),
1042 allow_aggregates: false,
1043 allow_subqueries: false,
1044 allow_parameters: false,
1045 allow_windows: false,
1046 };
1047 let hir = plan_expr(ecx, &expr)?.cast_to(
1048 ecx,
1049 CastContext::Assignment,
1050 &SqlScalarType::MzTimestamp,
1051 )?;
1052 if hir.contains_unmaterializable() {
1053 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1054 }
1055 let timestamp = hir
1062 .into_literal_mz_timestamp()
1063 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1064 Ok(timestamp)
1065}
1066
1067pub fn plan_secret_as(
1069 scx: &StatementContext,
1070 mut expr: Expr<Aug>,
1071) -> Result<MirScalarExpr, PlanError> {
1072 let scope = Scope::empty();
1073 let desc = RelationDesc::empty();
1074 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075
1076 transform_ast::transform(scx, &mut expr)?;
1077
1078 let ecx = &ExprContext {
1079 qcx: &qcx,
1080 name: "AS",
1081 scope: &scope,
1082 relation_type: desc.typ(),
1083 allow_aggregates: false,
1084 allow_subqueries: false,
1085 allow_parameters: false,
1086 allow_windows: false,
1087 };
1088 let expr = plan_expr(ecx, &expr)?
1089 .type_as(ecx, &SqlScalarType::Bytes)?
1090 .lower_uncorrelated(scx.catalog.system_vars())?;
1091 Ok(expr)
1092}
1093
1094pub fn plan_webhook_validate_using(
1096 scx: &StatementContext,
1097 validate_using: CreateWebhookSourceCheck<Aug>,
1098) -> Result<WebhookValidation, PlanError> {
1099 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1100
1101 let CreateWebhookSourceCheck {
1102 options,
1103 using: mut expr,
1104 } = validate_using;
1105
1106 let mut column_typs = vec![];
1107 let mut column_names = vec![];
1108
1109 let (bodies, headers, secrets) = options
1110 .map(|o| (o.bodies, o.headers, o.secrets))
1111 .unwrap_or_default();
1112
1113 let mut body_tuples = vec![];
1115 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1116 let scalar_type = use_bytes
1117 .then_some(SqlScalarType::Bytes)
1118 .unwrap_or(SqlScalarType::String);
1119 let name = alias
1120 .map(|a| a.into_string())
1121 .unwrap_or_else(|| "body".to_string());
1122
1123 column_typs.push(SqlColumnType {
1124 scalar_type,
1125 nullable: false,
1126 });
1127 column_names.push(name);
1128
1129 let column_idx = column_typs.len() - 1;
1131 assert_eq!(
1133 column_idx,
1134 column_names.len() - 1,
1135 "body column names and types don't match"
1136 );
1137 body_tuples.push((column_idx, use_bytes));
1138 }
1139
1140 let mut header_tuples = vec![];
1142
1143 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1144 let value_type = use_bytes
1145 .then_some(SqlScalarType::Bytes)
1146 .unwrap_or(SqlScalarType::String);
1147 let name = alias
1148 .map(|a| a.into_string())
1149 .unwrap_or_else(|| "headers".to_string());
1150
1151 column_typs.push(SqlColumnType {
1152 scalar_type: SqlScalarType::Map {
1153 value_type: Box::new(value_type),
1154 custom_id: None,
1155 },
1156 nullable: false,
1157 });
1158 column_names.push(name);
1159
1160 let column_idx = column_typs.len() - 1;
1162 assert_eq!(
1164 column_idx,
1165 column_names.len() - 1,
1166 "header column names and types don't match"
1167 );
1168 header_tuples.push((column_idx, use_bytes));
1169 }
1170
1171 let mut validation_secrets = vec![];
1173
1174 for CreateWebhookSourceSecret {
1175 secret,
1176 alias,
1177 use_bytes,
1178 } in secrets
1179 {
1180 let scalar_type = use_bytes
1182 .then_some(SqlScalarType::Bytes)
1183 .unwrap_or(SqlScalarType::String);
1184
1185 column_typs.push(SqlColumnType {
1186 scalar_type,
1187 nullable: false,
1188 });
1189 let ResolvedItemName::Item {
1190 id,
1191 full_name: FullItemName { item, .. },
1192 ..
1193 } = secret
1194 else {
1195 return Err(PlanError::InvalidSecret(Box::new(secret)));
1196 };
1197
1198 let name = if let Some(alias) = alias {
1200 alias.into_string()
1201 } else {
1202 item
1203 };
1204 column_names.push(name);
1205
1206 let column_idx = column_typs.len() - 1;
1209 assert_eq!(
1211 column_idx,
1212 column_names.len() - 1,
1213 "column names and types don't match"
1214 );
1215
1216 validation_secrets.push(WebhookValidationSecret {
1217 id,
1218 column_idx,
1219 use_bytes,
1220 });
1221 }
1222
1223 let relation_typ = SqlRelationType::new(column_typs);
1224 let desc = RelationDesc::new(relation_typ, column_names.clone());
1225 let scope = Scope::from_source(None, column_names);
1226
1227 transform_ast::transform(scx, &mut expr)?;
1228
1229 let ecx = &ExprContext {
1230 qcx: &qcx,
1231 name: "CHECK",
1232 scope: &scope,
1233 relation_type: desc.typ(),
1234 allow_aggregates: false,
1235 allow_subqueries: false,
1236 allow_parameters: false,
1237 allow_windows: false,
1238 };
1239 let expr = plan_expr(ecx, &expr)?
1240 .type_as(ecx, &SqlScalarType::Bool)?
1241 .lower_uncorrelated(scx.catalog.system_vars())?;
1242 let validation = WebhookValidation {
1243 expression: expr,
1244 relation_desc: desc,
1245 bodies: body_tuples,
1246 headers: header_tuples,
1247 secrets: validation_secrets,
1248 };
1249 Ok(validation)
1250}
1251
1252pub fn plan_default_expr(
1253 scx: &StatementContext,
1254 expr: &Expr<Aug>,
1255 target_ty: &SqlScalarType,
1256) -> Result<HirScalarExpr, PlanError> {
1257 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1258 let ecx = &ExprContext {
1259 qcx: &qcx,
1260 name: "DEFAULT expression",
1261 scope: &Scope::empty(),
1262 relation_type: &SqlRelationType::empty(),
1263 allow_aggregates: false,
1264 allow_subqueries: false,
1265 allow_parameters: false,
1266 allow_windows: false,
1267 };
1268 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1269 Ok(hir)
1270}
1271
1272pub fn plan_params<'a>(
1273 scx: &'a StatementContext,
1274 params: Vec<Expr<Aug>>,
1275 desc: &StatementDesc,
1276) -> Result<Params, PlanError> {
1277 if params.len() != desc.param_types.len() {
1278 sql_bail!(
1279 "expected {} params, got {}",
1280 desc.param_types.len(),
1281 params.len()
1282 );
1283 }
1284
1285 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1286
1287 let mut datums = Row::default();
1288 let mut packer = datums.packer();
1289 let mut actual_types = Vec::new();
1290 let temp_storage = &RowArena::new();
1291 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1292 transform_ast::transform(scx, &mut expr)?;
1293
1294 let ecx = execute_expr_context(&qcx);
1295 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1296 let actual_ty = ecx.scalar_type(&ex);
1297 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1298 return Err(PlanError::WrongParameterType(
1299 i + 1,
1300 ecx.humanize_sql_scalar_type(expected_ty, false),
1301 ecx.humanize_sql_scalar_type(&actual_ty, false),
1302 ));
1303 }
1304 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1305 let evaled = ex.eval(&[], temp_storage)?;
1306 packer.push(evaled);
1307 actual_types.push(actual_ty);
1308 }
1309 Ok(Params {
1310 datums,
1311 execute_types: actual_types,
1312 expected_types: desc.param_types.clone(),
1313 })
1314}
1315
1316static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1317static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1318
1319pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1321 ExprContext {
1322 qcx,
1323 name: "EXECUTE",
1324 scope: &EXECUTE_CONTEXT_SCOPE,
1325 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1326 allow_aggregates: false,
1327 allow_subqueries: false,
1328 allow_parameters: false,
1329 allow_windows: false,
1330 }
1331}
1332
1333pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1338 LazyLock::new(|| CastContext::Assignment);
1339
1340pub fn plan_index_exprs<'a>(
1341 scx: &'a StatementContext,
1342 on_desc: &RelationDesc,
1343 exprs: Vec<Expr<Aug>>,
1344) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1345 let scope = Scope::from_source(None, on_desc.iter_names());
1346 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1347
1348 let ecx = &ExprContext {
1349 qcx: &qcx,
1350 name: "CREATE INDEX",
1351 scope: &scope,
1352 relation_type: on_desc.typ(),
1353 allow_aggregates: false,
1354 allow_subqueries: false,
1355 allow_parameters: false,
1356 allow_windows: false,
1357 };
1358 let repr_col_types: Vec<ReprColumnType> = on_desc
1359 .typ()
1360 .column_types
1361 .iter()
1362 .map(ReprColumnType::from)
1363 .collect();
1364 let mut out = vec![];
1365 for mut expr in exprs {
1366 transform_ast::transform(scx, &mut expr)?;
1367 let expr = plan_expr_or_col_index(ecx, &expr)?;
1368 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1369 expr.reduce(&repr_col_types);
1370 out.push(expr);
1371 }
1372 Ok(out)
1373}
1374
1375fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1376 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1377 Some(column) => Ok(HirScalarExpr::column(column)),
1378 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1379 }
1380}
1381
1382fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1383 match e {
1384 Expr::Value(Value::Number(n)) => {
1385 let n = n.parse::<usize>().map_err(|e| {
1386 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1387 })?;
1388 if n < 1 || n > max {
1389 sql_bail!(
1390 "column reference {} in {} is out of range (1 - {})",
1391 n,
1392 name,
1393 max
1394 );
1395 }
1396 Ok(Some(n - 1))
1397 }
1398 _ => Ok(None),
1399 }
1400}
1401
1402struct PlannedQuery {
1403 expr: HirRelationExpr,
1404 scope: Scope,
1405 order_by: Vec<ColumnOrder>,
1406 limit: Option<HirScalarExpr>,
1407 offset: HirScalarExpr,
1413 project: Vec<usize>,
1414 group_size_hints: GroupSizeHints,
1415}
1416
1417fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1418 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1419}
1420
1421fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1422 let cte_bindings = plan_ctes(qcx, q)?;
1425
1426 let limit = match &q.limit {
1427 None => None,
1428 Some(Limit {
1429 quantity,
1430 with_ties: false,
1431 }) => {
1432 let ecx = &ExprContext {
1433 qcx,
1434 name: "LIMIT",
1435 scope: &Scope::empty(),
1436 relation_type: &SqlRelationType::empty(),
1437 allow_aggregates: false,
1438 allow_subqueries: true,
1439 allow_parameters: true,
1440 allow_windows: false,
1441 };
1442 let limit = plan_expr(ecx, quantity)?;
1443 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1444
1445 let limit = if limit.is_constant() {
1446 let arena = RowArena::new();
1447 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1448
1449 match limit.eval(&[], &arena)? {
1453 d @ Datum::Int64(v) if v >= 0 => {
1454 HirScalarExpr::literal(d, SqlScalarType::Int64)
1455 }
1456 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1457 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1458 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1459 }
1460 } else {
1461 qcx.scx
1463 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1464 limit
1465 };
1466
1467 Some(limit)
1468 }
1469 Some(Limit {
1470 quantity: _,
1471 with_ties: true,
1472 }) => bail_unsupported!("FETCH ... WITH TIES"),
1473 };
1474
1475 let offset = match &q.offset {
1476 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1477 Some(offset) => {
1478 let ecx = &ExprContext {
1479 qcx,
1480 name: "OFFSET",
1481 scope: &Scope::empty(),
1482 relation_type: &SqlRelationType::empty(),
1483 allow_aggregates: false,
1484 allow_subqueries: false,
1485 allow_parameters: true,
1486 allow_windows: false,
1487 };
1488 let offset = plan_expr(ecx, offset)?;
1489 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491 let offset = if offset.is_constant() {
1492 let offset_value = offset_into_value(offset)?;
1494 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1495 } else {
1496 if !offset.contains_parameters() {
1500 return Err(PlanError::InvalidOffset(format!(
1501 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1502 offset
1503 )));
1504 }
1505 offset
1506 };
1507 offset
1508 }
1509 };
1510
1511 let mut planned_query = match &q.body {
1512 SetExpr::Select(s) => {
1513 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1515 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1516
1517 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1518 PlannedQuery {
1519 expr: plan.expr,
1520 scope: plan.scope,
1521 order_by: plan.order_by,
1522 project: plan.project,
1523 limit,
1524 offset,
1525 group_size_hints,
1526 }
1527 }
1528 _ => {
1529 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1530 let ecx = &ExprContext {
1531 qcx,
1532 name: "ORDER BY clause of a set expression",
1533 scope: &scope,
1534 relation_type: &qcx.relation_type(&expr),
1535 allow_aggregates: false,
1536 allow_subqueries: true,
1537 allow_parameters: true,
1538 allow_windows: false,
1539 };
1540 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1541 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1542 let project = (0..ecx.relation_type.arity()).collect();
1543 PlannedQuery {
1544 expr: expr.map(map_exprs),
1545 scope,
1546 order_by,
1547 limit,
1548 project,
1549 offset,
1550 group_size_hints: GroupSizeHints::default(),
1551 }
1552 }
1553 };
1554
1555 match &q.ctes {
1557 CteBlock::Simple(_) => {
1558 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1559 if let Some(cte) = qcx.ctes.remove(&id) {
1560 planned_query.expr = HirRelationExpr::Let {
1561 name: cte.name,
1562 id: id.clone(),
1563 value: Box::new(value),
1564 body: Box::new(planned_query.expr),
1565 };
1566 }
1567 if let Some(shadowed_val) = shadowed_val {
1568 qcx.ctes.insert(id, shadowed_val);
1569 }
1570 }
1571 }
1572 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1573 let MutRecBlockOptionExtracted {
1574 recursion_limit,
1575 return_at_recursion_limit,
1576 error_at_recursion_limit,
1577 seen: _,
1578 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1579 let limit = match (
1580 recursion_limit,
1581 return_at_recursion_limit,
1582 error_at_recursion_limit,
1583 ) {
1584 (None, None, None) => None,
1585 (Some(max_iters), None, None) => {
1586 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1587 }
1588 (None, Some(max_iters), None) => Some((max_iters, true)),
1589 (None, None, Some(max_iters)) => Some((max_iters, false)),
1590 _ => {
1591 return Err(InvalidWmrRecursionLimit(
1592 "More than one recursion limit given. \
1593 Please give at most one of RECURSION LIMIT, \
1594 ERROR AT RECURSION LIMIT, \
1595 RETURN AT RECURSION LIMIT."
1596 .to_owned(),
1597 ));
1598 }
1599 }
1600 .try_map(|(max_iters, return_at_limit)| {
1601 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1602 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1603 "Recursion limit has to be greater than 0.".to_owned(),
1604 ))?,
1605 return_at_limit: *return_at_limit,
1606 })
1607 })?;
1608
1609 let mut bindings = Vec::new();
1610 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1611 if let Some(cte) = qcx.ctes.remove(&id) {
1612 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1613 }
1614 if let Some(shadowed_val) = shadowed_val {
1615 qcx.ctes.insert(id, shadowed_val);
1616 }
1617 }
1618 if !bindings.is_empty() {
1619 planned_query.expr = HirRelationExpr::LetRec {
1620 limit,
1621 bindings,
1622 body: Box::new(planned_query.expr),
1623 }
1624 }
1625 }
1626 }
1627
1628 Ok(planned_query)
1629}
1630
1631pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1633 let offset = offset
1634 .try_into_literal_int64()
1635 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1636 if offset < 0 {
1637 return Err(PlanError::InvalidOffset(format!(
1638 "must not be negative, got {}",
1639 offset
1640 )));
1641 }
1642 Ok(offset)
1643}
1644
1645generate_extracted_config!(
1646 MutRecBlockOption,
1647 (RecursionLimit, u64),
1648 (ReturnAtRecursionLimit, u64),
1649 (ErrorAtRecursionLimit, u64)
1650);
1651
1652pub fn plan_ctes(
1657 qcx: &mut QueryContext,
1658 q: &Query<Aug>,
1659) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1660 let mut result = Vec::new();
1662 let mut shadowed_descs = BTreeMap::new();
1665
1666 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1668 sql_bail!(
1669 "WITH query name {} specified more than once",
1670 normalize::ident_ref(ident).quoted()
1671 )
1672 }
1673
1674 match &q.ctes {
1675 CteBlock::Simple(ctes) => {
1676 for cte in ctes.iter() {
1678 let cte_name = normalize::ident(cte.alias.name.clone());
1679 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1680 let typ = qcx.relation_type(&val);
1681 let mut desc = RelationDesc::new(typ, scope.column_names());
1682 plan_utils::maybe_rename_columns(
1683 format!("CTE {}", cte.alias.name),
1684 &mut desc,
1685 &cte.alias.columns,
1686 )?;
1687 let shadowed = qcx.ctes.insert(
1689 cte.id,
1690 CteDesc {
1691 name: cte_name,
1692 desc,
1693 },
1694 );
1695
1696 result.push((cte.id, val, shadowed));
1697 }
1698 }
1699 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1700 for cte in ctes.iter() {
1702 let cte_name = normalize::ident(cte.name.clone());
1703 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1704 for column in cte.columns.iter() {
1705 desc_columns.push((
1706 normalize::column_name(column.name.clone()),
1707 SqlColumnType {
1708 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1709 nullable: true,
1710 },
1711 ));
1712 }
1713 let desc = RelationDesc::from_names_and_types(desc_columns);
1714 let shadowed = qcx.ctes.insert(
1715 cte.id,
1716 CteDesc {
1717 name: cte_name,
1718 desc,
1719 },
1720 );
1721 if let Some(shadowed) = shadowed {
1723 shadowed_descs.insert(cte.id, shadowed);
1724 }
1725 }
1726
1727 for cte in ctes.iter() {
1729 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1730
1731 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1732
1733 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1734 sql_bail!(
1737 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1738 );
1739 }
1740
1741 if !proposed_typ.keys.is_empty() {
1742 sql_bail!("[internal error]: WMR CTEs do not support keys");
1745 }
1746
1747 let derived_typ = qcx.relation_type(&val);
1749
1750 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1751 let cte_name = normalize::ident(cte.name.clone());
1752 let proposed_typ = proposed_typ
1753 .column_types
1754 .iter()
1755 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1756 .collect::<Vec<_>>();
1757 let inferred_typ = derived_typ
1758 .column_types
1759 .iter()
1760 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1761 .collect::<Vec<_>>();
1762 Err(PlanError::RecursiveTypeMismatch(
1763 cte_name,
1764 proposed_typ,
1765 inferred_typ,
1766 ))
1767 };
1768
1769 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1770 return type_err(proposed_typ, derived_typ);
1771 }
1772
1773 let val = match cast_relation(
1775 qcx,
1776 CastContext::Assignment,
1781 val,
1782 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1783 ) {
1784 Ok(val) => val,
1785 Err(_) => return type_err(proposed_typ, derived_typ),
1786 };
1787
1788 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1789 }
1790 }
1791 }
1792
1793 Ok(result)
1794}
1795
1796pub fn plan_nested_query(
1797 qcx: &mut QueryContext,
1798 q: &Query<Aug>,
1799) -> Result<(HirRelationExpr, Scope), PlanError> {
1800 let PlannedQuery {
1801 mut expr,
1802 scope,
1803 order_by,
1804 limit,
1805 offset,
1806 project,
1807 group_size_hints,
1808 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1809 if limit.is_some()
1810 || !offset
1811 .clone()
1812 .try_into_literal_int64()
1813 .is_ok_and(|offset| offset == 0)
1814 {
1815 expr = HirRelationExpr::top_k(
1816 expr,
1817 vec![],
1818 order_by,
1819 limit,
1820 offset,
1821 group_size_hints.limit_input_group_size,
1822 );
1823 }
1824 Ok((expr.project(project), scope))
1825}
1826
1827fn plan_set_expr(
1828 qcx: &mut QueryContext,
1829 q: &SetExpr<Aug>,
1830) -> Result<(HirRelationExpr, Scope), PlanError> {
1831 match q {
1832 SetExpr::Select(select) => {
1833 let order_by_exprs = Vec::new();
1834 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1835 assert!(plan.order_by.is_empty());
1838 Ok((plan.expr.project(plan.project), plan.scope))
1839 }
1840 SetExpr::SetOperation {
1841 op,
1842 all,
1843 left,
1844 right,
1845 } => {
1846 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1848 let (right_expr, right_scope) =
1849 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1850
1851 let left_type = qcx.relation_type(&left_expr);
1853 let right_type = qcx.relation_type(&right_expr);
1854 if left_type.arity() != right_type.arity() {
1855 sql_bail!(
1856 "each {} query must have the same number of columns: {} vs {}",
1857 op,
1858 left_type.arity(),
1859 right_type.arity(),
1860 );
1861 }
1862
1863 let left_ecx = &ExprContext {
1868 qcx,
1869 name: &op.to_string(),
1870 scope: &left_scope,
1871 relation_type: &left_type,
1872 allow_aggregates: false,
1873 allow_subqueries: false,
1874 allow_parameters: false,
1875 allow_windows: false,
1876 };
1877 let right_ecx = &ExprContext {
1878 qcx,
1879 name: &op.to_string(),
1880 scope: &right_scope,
1881 relation_type: &right_type,
1882 allow_aggregates: false,
1883 allow_subqueries: false,
1884 allow_parameters: false,
1885 allow_windows: false,
1886 };
1887 let mut left_casts = vec![];
1888 let mut right_casts = vec![];
1889 for (i, (left_type, right_type)) in left_type
1890 .column_types
1891 .iter()
1892 .zip_eq(right_type.column_types.iter())
1893 .enumerate()
1894 {
1895 let types = &[
1896 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1897 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1898 ];
1899 let target =
1900 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1901 match typeconv::plan_cast(
1902 left_ecx,
1903 CastContext::Implicit,
1904 HirScalarExpr::column(i),
1905 &target,
1906 ) {
1907 Ok(expr) => left_casts.push(expr),
1908 Err(_) => sql_bail!(
1909 "{} types {} and {} cannot be matched",
1910 op,
1911 qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1912 qcx.humanize_sql_scalar_type(&target, false),
1913 ),
1914 }
1915 match typeconv::plan_cast(
1916 right_ecx,
1917 CastContext::Implicit,
1918 HirScalarExpr::column(i),
1919 &target,
1920 ) {
1921 Ok(expr) => right_casts.push(expr),
1922 Err(_) => sql_bail!(
1923 "{} types {} and {} cannot be matched",
1924 op,
1925 qcx.humanize_sql_scalar_type(&target, false),
1926 qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1927 ),
1928 }
1929 }
1930 let lhs = if left_casts
1931 .iter()
1932 .enumerate()
1933 .any(|(i, e)| e != &HirScalarExpr::column(i))
1934 {
1935 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1936 left_expr.map(left_casts).project(project_key)
1937 } else {
1938 left_expr
1939 };
1940 let rhs = if right_casts
1941 .iter()
1942 .enumerate()
1943 .any(|(i, e)| e != &HirScalarExpr::column(i))
1944 {
1945 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1946 right_expr.map(right_casts).project(project_key)
1947 } else {
1948 right_expr
1949 };
1950
1951 let relation_expr = match op {
1952 SetOperator::Union => {
1953 if *all {
1954 lhs.union(rhs)
1955 } else {
1956 lhs.union(rhs).distinct()
1957 }
1958 }
1959 SetOperator::Except => Hir::except(all, lhs, rhs),
1960 SetOperator::Intersect => {
1961 let left_clone = lhs.clone();
1967 if *all {
1968 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1969 } else {
1970 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1971 .distinct()
1972 }
1973 }
1974 };
1975 let scope = Scope::from_source(
1976 None,
1977 left_scope.column_names(),
1979 );
1980
1981 Ok((relation_expr, scope))
1982 }
1983 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1984 SetExpr::Table(name) => {
1985 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1986 Ok((expr, scope))
1987 }
1988 SetExpr::Query(query) => {
1989 let (expr, scope) = plan_nested_query(qcx, query)?;
1990 Ok((expr, scope))
1991 }
1992 SetExpr::Show(stmt) => {
1993 if !qcx.lifetime.allow_show() {
2007 return Err(PlanError::ShowCommandInView);
2008 }
2009
2010 fn to_hirscope(
2013 plan: ShowCreatePlan,
2014 desc: StatementDesc,
2015 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2016 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2017 let desc = desc.relation_desc.ok_or_else(|| {
2018 internal_err!("statement description missing relation descriptor")
2019 })?;
2020 let scope = Scope::from_source(None, desc.iter_names());
2021 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2022 Ok((expr, scope))
2023 }
2024
2025 match stmt.clone() {
2026 ShowStatement::ShowColumns(stmt) => {
2027 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2028 }
2029 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2030 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2031 show::describe_show_create_connection(qcx.scx, stmt)?,
2032 ),
2033 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2034 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2035 show::describe_show_create_cluster(qcx.scx, stmt)?,
2036 ),
2037 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2038 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2039 show::describe_show_create_index(qcx.scx, stmt)?,
2040 ),
2041 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2042 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2043 show::describe_show_create_sink(qcx.scx, stmt)?,
2044 ),
2045 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2046 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2047 show::describe_show_create_source(qcx.scx, stmt)?,
2048 ),
2049 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2050 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2051 show::describe_show_create_table(qcx.scx, stmt)?,
2052 ),
2053 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2054 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2055 show::describe_show_create_view(qcx.scx, stmt)?,
2056 ),
2057 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2058 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2059 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2060 ),
2061 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2062 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2063 show::describe_show_create_type(qcx.scx, stmt)?,
2064 ),
2065 ShowStatement::ShowObjects(stmt) => {
2066 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2067 }
2068 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2069 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2070 }
2071 }
2072 }
2073}
2074
2075fn plan_values(
2077 qcx: &QueryContext,
2078 values: &[Vec<Expr<Aug>>],
2079) -> Result<(HirRelationExpr, Scope), PlanError> {
2080 assert!(!values.is_empty());
2081
2082 let ecx = &ExprContext {
2083 qcx,
2084 name: "VALUES",
2085 scope: &Scope::empty(),
2086 relation_type: &SqlRelationType::empty(),
2087 allow_aggregates: false,
2088 allow_subqueries: true,
2089 allow_parameters: true,
2090 allow_windows: false,
2091 };
2092
2093 let ncols = values[0].len();
2094 let nrows = values.len();
2095
2096 let mut cols = vec![vec![]; ncols];
2099 for row in values {
2100 if row.len() != ncols {
2101 sql_bail!(
2102 "VALUES expression has varying number of columns: {} vs {}",
2103 row.len(),
2104 ncols
2105 );
2106 }
2107 for (i, v) in row.iter().enumerate() {
2108 cols[i].push(v);
2109 }
2110 }
2111
2112 let mut col_iters = Vec::with_capacity(ncols);
2114 let mut col_types = Vec::with_capacity(ncols);
2115 for col in &cols {
2116 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2117 let mut col_type = ecx.column_type(&col[0]);
2118 for val in &col[1..] {
2119 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2121 col_types.push(col_type);
2122 col_iters.push(col.into_iter());
2123 }
2124
2125 let mut exprs = vec![];
2127 for _ in 0..nrows {
2128 for i in 0..ncols {
2129 exprs.push(col_iters[i].next().unwrap());
2130 }
2131 }
2132 let out = HirRelationExpr::CallTable {
2133 func: TableFunc::Wrap {
2134 width: ncols,
2135 types: col_types,
2136 },
2137 exprs,
2138 };
2139
2140 let mut scope = Scope::empty();
2142 for i in 0..ncols {
2143 let name = format!("column{}", i + 1);
2144 scope.items.push(ScopeItem::from_column_name(name));
2145 }
2146
2147 Ok((out, scope))
2148}
2149
2150fn plan_values_insert(
2160 qcx: &QueryContext,
2161 target_names: &[&ColumnName],
2162 target_types: &[&SqlScalarType],
2163 values: &[Vec<Expr<Aug>>],
2164) -> Result<HirRelationExpr, PlanError> {
2165 assert!(!values.is_empty());
2166
2167 if !values.iter().map(|row| row.len()).all_equal() {
2168 sql_bail!("VALUES lists must all be the same length");
2169 }
2170
2171 let ecx = &ExprContext {
2172 qcx,
2173 name: "VALUES",
2174 scope: &Scope::empty(),
2175 relation_type: &SqlRelationType::empty(),
2176 allow_aggregates: false,
2177 allow_subqueries: true,
2178 allow_parameters: true,
2179 allow_windows: false,
2180 };
2181
2182 let mut exprs = vec![];
2183 let mut types = vec![];
2184 for row in values {
2185 if row.len() > target_names.len() {
2186 sql_bail!("INSERT has more expressions than target columns");
2187 }
2188 for (column, val) in row.into_iter().enumerate() {
2189 let target_type = &target_types[column];
2190 let val = plan_expr(ecx, val)?;
2191 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2192 let source_type = &ecx.scalar_type(&val);
2193 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2194 Ok(val) => val,
2195 Err(_) => sql_bail!(
2196 "column {} is of type {} but expression is of type {}",
2197 target_names[column].quoted(),
2198 qcx.humanize_sql_scalar_type(target_type, false),
2199 qcx.humanize_sql_scalar_type(source_type, false),
2200 ),
2201 };
2202 if column >= types.len() {
2203 types.push(ecx.column_type(&val));
2204 } else {
2205 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2207 exprs.push(val);
2208 }
2209 }
2210
2211 Ok(HirRelationExpr::CallTable {
2212 func: TableFunc::Wrap {
2213 width: values[0].len(),
2214 types,
2215 },
2216 exprs,
2217 })
2218}
2219
2220fn plan_join_identity() -> (HirRelationExpr, Scope) {
2221 let typ = SqlRelationType::new(vec![]);
2222 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2223 let scope = Scope::empty();
2224 (expr, scope)
2225}
2226
2227#[derive(Debug)]
2233struct SelectPlan {
2234 expr: HirRelationExpr,
2235 scope: Scope,
2236 order_by: Vec<ColumnOrder>,
2237 project: Vec<usize>,
2238}
2239
2240generate_extracted_config!(
2241 SelectOption,
2242 (ExpectedGroupSize, u64),
2243 (AggregateInputGroupSize, u64),
2244 (DistinctOnInputGroupSize, u64),
2245 (LimitInputGroupSize, u64)
2246);
2247
2248fn plan_select_from_where(
2266 qcx: &QueryContext,
2267 mut s: Select<Aug>,
2268 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2269) -> Result<SelectPlan, PlanError> {
2270 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2277 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2278
2279 let (mut relation_expr, mut from_scope) =
2281 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2282 let (left, left_scope) = l;
2283 plan_join(
2284 qcx,
2285 left,
2286 left_scope,
2287 &Join {
2288 relation: TableFactor::NestedJoin {
2289 join: Box::new(twj.clone()),
2290 alias: None,
2291 },
2292 join_operator: JoinOperator::CrossJoin,
2293 },
2294 )
2295 })?;
2296
2297 if let Some(selection) = &s.selection {
2299 let ecx = &ExprContext {
2300 qcx,
2301 name: "WHERE clause",
2302 scope: &from_scope,
2303 relation_type: &qcx.relation_type(&relation_expr),
2304 allow_aggregates: false,
2305 allow_subqueries: true,
2306 allow_parameters: true,
2307 allow_windows: false,
2308 };
2309 let expr = plan_expr(ecx, selection)
2310 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2311 .type_as(ecx, &SqlScalarType::Bool)?;
2312 relation_expr = relation_expr.filter(vec![expr]);
2313 }
2314
2315 let (aggregates, table_funcs) = {
2318 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2319 visitor.visit_select_mut(&mut s);
2320 for o in order_by_exprs.iter_mut() {
2321 visitor.visit_order_by_expr_mut(o);
2322 }
2323 visitor.into_result()?
2324 };
2325 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2326 if !table_funcs.is_empty() {
2327 let (expr, scope) = plan_scalar_table_funcs(
2328 qcx,
2329 table_funcs,
2330 &mut table_func_names,
2331 &relation_expr,
2332 &from_scope,
2333 )?;
2334 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2335 from_scope = from_scope.product(scope)?;
2336 }
2337
2338 let projection = {
2340 let ecx = &ExprContext {
2341 qcx,
2342 name: "SELECT clause",
2343 scope: &from_scope,
2344 relation_type: &qcx.relation_type(&relation_expr),
2345 allow_aggregates: true,
2346 allow_subqueries: true,
2347 allow_parameters: true,
2348 allow_windows: true,
2349 };
2350 let mut out = vec![];
2351 for si in &s.projection {
2352 if *si == SelectItem::Wildcard && s.from.is_empty() {
2353 sql_bail!("SELECT * with no tables specified is not valid");
2354 }
2355 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2356 }
2357 out
2358 };
2359
2360 let (mut group_scope, select_all_mapping) = {
2364 let ecx = &ExprContext {
2366 qcx,
2367 name: "GROUP BY clause",
2368 scope: &from_scope,
2369 relation_type: &qcx.relation_type(&relation_expr),
2370 allow_aggregates: false,
2371 allow_subqueries: true,
2372 allow_parameters: true,
2373 allow_windows: false,
2374 };
2375 let mut group_key = vec![];
2376 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2377 let mut group_hir_exprs = vec![];
2378 let mut group_scope = Scope::empty();
2379 let mut select_all_mapping = BTreeMap::new();
2380
2381 for group_expr in &s.group_by {
2382 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2383 let new_column = group_key.len();
2384
2385 if let Some(group_expr) = group_expr {
2386 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2390 existing_scope_item.exprs.insert(group_expr.clone());
2391 continue;
2392 }
2393 }
2394
2395 let mut scope_item = if let HirScalarExpr::Column(
2396 ColumnRef {
2397 level: 0,
2398 column: old_column,
2399 },
2400 _name,
2401 ) = &expr
2402 {
2403 select_all_mapping.insert(*old_column, new_column);
2409 let scope_item = ecx.scope.items[*old_column].clone();
2410 scope_item
2411 } else {
2412 ScopeItem::empty()
2413 };
2414
2415 if let Some(group_expr) = group_expr.cloned() {
2416 scope_item.exprs.insert(group_expr);
2417 }
2418
2419 group_key.push(from_scope.len() + group_exprs.len());
2420 group_hir_exprs.push(expr.clone());
2421 group_exprs.insert(expr, scope_item);
2422 }
2423
2424 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2425 for expr in &group_hir_exprs {
2426 if let Some(scope_item) = group_exprs.remove(expr) {
2427 group_scope.items.push(scope_item);
2428 }
2429 }
2430
2431 let ecx = &ExprContext {
2433 qcx,
2434 name: "aggregate function",
2435 scope: &from_scope,
2436 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2437 allow_aggregates: false,
2438 allow_subqueries: true,
2439 allow_parameters: true,
2440 allow_windows: false,
2441 };
2442 let mut agg_exprs = vec![];
2443 for sql_function in aggregates {
2444 if sql_function.over.is_some() {
2445 unreachable!(
2446 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2447 );
2448 }
2449 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2450 group_scope
2451 .items
2452 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2453 }
2454 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2455 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2457 group_key,
2458 agg_exprs,
2459 group_size_hints.aggregate_input_group_size,
2460 );
2461
2462 for i in 0..from_scope.len() {
2468 if !select_all_mapping.contains_key(&i) {
2469 let scope_item = &ecx.scope.items[i];
2470 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2471 table_name: scope_item.table_name.clone(),
2472 column_name: scope_item.column_name.clone(),
2473 allow_unqualified_references: scope_item.allow_unqualified_references,
2474 });
2475 }
2476 }
2477
2478 (group_scope, select_all_mapping)
2479 } else {
2480 (
2482 from_scope.clone(),
2483 (0..from_scope.len()).map(|i| (i, i)).collect(),
2484 )
2485 }
2486 };
2487
2488 if let Some(ref having) = s.having {
2490 let ecx = &ExprContext {
2491 qcx,
2492 name: "HAVING clause",
2493 scope: &group_scope,
2494 relation_type: &qcx.relation_type(&relation_expr),
2495 allow_aggregates: true,
2496 allow_subqueries: true,
2497 allow_parameters: true,
2498 allow_windows: false,
2499 };
2500 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2501 relation_expr = relation_expr.filter(vec![expr]);
2502 }
2503
2504 let window_funcs = {
2517 let mut visitor = WindowFuncCollector::default();
2518 visitor.visit_select(&s);
2522 for o in order_by_exprs.iter() {
2523 visitor.visit_order_by_expr(o);
2524 }
2525 visitor.into_result()
2526 };
2527 for window_func in window_funcs {
2528 let ecx = &ExprContext {
2529 qcx,
2530 name: "window function",
2531 scope: &group_scope,
2532 relation_type: &qcx.relation_type(&relation_expr),
2533 allow_aggregates: true,
2534 allow_subqueries: true,
2535 allow_parameters: true,
2536 allow_windows: true,
2537 };
2538 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2539 group_scope.items.push(ScopeItem::from_expr(window_func));
2540 }
2541 if let Some(ref qualify) = s.qualify {
2548 let ecx = &ExprContext {
2549 qcx,
2550 name: "QUALIFY clause",
2551 scope: &group_scope,
2552 relation_type: &qcx.relation_type(&relation_expr),
2553 allow_aggregates: true,
2554 allow_subqueries: true,
2555 allow_parameters: true,
2556 allow_windows: true,
2557 };
2558 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2559 relation_expr = relation_expr.filter(vec![expr]);
2560 }
2561
2562 let output_columns = {
2564 let mut new_exprs = vec![];
2565 let mut new_type = qcx.relation_type(&relation_expr);
2566 let mut output_columns = vec![];
2567 for (select_item, column_name) in &projection {
2568 let ecx = &ExprContext {
2569 qcx,
2570 name: "SELECT clause",
2571 scope: &group_scope,
2572 relation_type: &new_type,
2573 allow_aggregates: true,
2574 allow_subqueries: true,
2575 allow_parameters: true,
2576 allow_windows: true,
2577 };
2578 let expr = match select_item {
2579 ExpandedSelectItem::InputOrdinal(i) => {
2580 if let Some(column) = select_all_mapping.get(i).copied() {
2581 HirScalarExpr::column(column)
2582 } else {
2583 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2584 }
2585 }
2586 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2587 };
2588 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2589 output_columns.push((column, column_name));
2591 } else {
2592 let typ = ecx.column_type(&expr);
2599 new_type.column_types.push(typ);
2600 new_exprs.push(expr);
2601 output_columns.push((group_scope.len(), column_name));
2602 group_scope
2603 .items
2604 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2605 }
2606 }
2607 relation_expr = relation_expr.map(new_exprs);
2608 output_columns
2609 };
2610 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2611
2612 let order_by = {
2614 let relation_type = qcx.relation_type(&relation_expr);
2615 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2616 &ExprContext {
2617 qcx,
2618 name: "ORDER BY clause",
2619 scope: &group_scope,
2620 relation_type: &relation_type,
2621 allow_aggregates: true,
2622 allow_subqueries: true,
2623 allow_parameters: true,
2624 allow_windows: true,
2625 },
2626 &order_by_exprs,
2627 &output_columns,
2628 )?;
2629
2630 match s.distinct {
2631 None => relation_expr = relation_expr.map(map_exprs),
2632 Some(Distinct::EntireRow) => {
2633 if relation_type.arity() == 0 {
2634 sql_bail!("SELECT DISTINCT must have at least one column");
2635 }
2636 if !try_push_projection_order_by(
2640 &mut relation_expr,
2641 &mut project_key,
2642 &mut order_by,
2643 ) {
2644 sql_bail!(
2645 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2646 );
2647 }
2648 assert!(map_exprs.is_empty());
2649 relation_expr = relation_expr.distinct();
2650 }
2651 Some(Distinct::On(exprs)) => {
2652 let ecx = &ExprContext {
2653 qcx,
2654 name: "DISTINCT ON clause",
2655 scope: &group_scope,
2656 relation_type: &qcx.relation_type(&relation_expr),
2657 allow_aggregates: true,
2658 allow_subqueries: true,
2659 allow_parameters: true,
2660 allow_windows: true,
2661 };
2662
2663 let mut distinct_exprs = vec![];
2664 for expr in &exprs {
2665 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2666 distinct_exprs.push(expr);
2667 }
2668
2669 let mut distinct_key = vec![];
2670
2671 let arity = relation_type.arity();
2681 for ord in order_by.iter().take(distinct_exprs.len()) {
2682 let mut expr = &HirScalarExpr::column(ord.column);
2685 if ord.column >= arity {
2686 expr = &map_exprs[ord.column - arity];
2687 };
2688 match distinct_exprs.iter().position(move |e| e == expr) {
2689 None => sql_bail!(
2690 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2691 ),
2692 Some(pos) => {
2693 distinct_exprs.remove(pos);
2694 }
2695 }
2696 distinct_key.push(ord.column);
2697 }
2698
2699 for expr in distinct_exprs {
2701 let column = match expr {
2704 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2705 _ => {
2706 map_exprs.push(expr);
2707 arity + map_exprs.len() - 1
2708 }
2709 };
2710 distinct_key.push(column);
2711 }
2712
2713 let distinct_len = distinct_key.len();
2718 relation_expr = HirRelationExpr::top_k(
2719 relation_expr.map(map_exprs),
2720 distinct_key,
2721 order_by.iter().skip(distinct_len).cloned().collect(),
2722 Some(HirScalarExpr::literal(
2723 Datum::Int64(1),
2724 SqlScalarType::Int64,
2725 )),
2726 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2727 group_size_hints.distinct_on_input_group_size,
2728 );
2729 }
2730 }
2731
2732 order_by
2733 };
2734
2735 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2740
2741 Ok(SelectPlan {
2742 expr: relation_expr,
2743 scope,
2744 order_by,
2745 project: project_key,
2746 })
2747}
2748
2749fn plan_scalar_table_funcs(
2750 qcx: &QueryContext,
2751 table_funcs: BTreeMap<Function<Aug>, String>,
2752 table_func_names: &mut BTreeMap<String, Ident>,
2753 relation_expr: &HirRelationExpr,
2754 from_scope: &Scope,
2755) -> Result<(HirRelationExpr, Scope), PlanError> {
2756 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2757
2758 for (table_func, id) in table_funcs.iter() {
2759 table_func_names.insert(
2760 id.clone(),
2761 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2763 );
2764 }
2765 if table_funcs.len() == 1 {
2768 let (table_func, id) = table_funcs.iter().next().unwrap();
2769 let (expr, mut scope) =
2770 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2771
2772 let num_cols = scope.len();
2774 for i in 0..scope.len() {
2775 scope.items[i].table_name = Some(PartialItemName {
2776 database: None,
2777 schema: None,
2778 item: id.clone(),
2779 });
2780 scope.items[i].from_single_column_function = num_cols == 1;
2781 scope.items[i].allow_unqualified_references = false;
2782 }
2783 return Ok((expr, scope));
2784 }
2785 if table_funcs.keys().any(is_repeat_row) {
2786 bail_unsupported!(format!(
2789 "{} in a SELECT clause with multiple table functions",
2790 REPEAT_ROW_NAME
2791 ));
2792 }
2793 let (expr, mut scope, num_cols) =
2795 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797 let mut i = 0;
2799 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800 for _ in 0..num_cols {
2801 scope.items[i].table_name = Some(PartialItemName {
2802 database: None,
2803 schema: None,
2804 item: id.clone(),
2805 });
2806 scope.items[i].from_single_column_function = num_cols == 1;
2807 scope.items[i].allow_unqualified_references = false;
2808 i += 1;
2809 }
2810 scope.items[i].table_name = Some(PartialItemName {
2814 database: None,
2815 schema: None,
2816 item: id.clone(),
2817 });
2818 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819 scope.items[i].allow_unqualified_references = false;
2820 i += 1;
2821 }
2822 scope.items[i].allow_unqualified_references = false;
2824 Ok((expr, scope))
2825}
2826
2827fn plan_group_by_expr<'a>(
2834 ecx: &ExprContext,
2835 group_expr: &'a Expr<Aug>,
2836 projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838 let plan_projection = |column: usize| match &projection[column].0 {
2839 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840 ExpandedSelectItem::Expr(expr) => {
2841 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842 }
2843 };
2844
2845 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848 return plan_projection(column);
2849 }
2850
2851 match group_expr {
2855 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856 Err(PlanError::UnknownColumn {
2857 table: None,
2858 column,
2859 similar,
2860 }) => {
2861 let mut iter = projection.iter().map(|(_expr, name)| name);
2864 if let Some(i) = iter.position(|n| *n == column) {
2865 if iter.any(|n| *n == column) {
2866 Err(PlanError::AmbiguousColumn(column))
2867 } else {
2868 plan_projection(i)
2869 }
2870 } else {
2871 Err(PlanError::UnknownColumn {
2874 table: None,
2875 column,
2876 similar,
2877 })
2878 }
2879 }
2880 res => Ok((Some(group_expr), res?)),
2881 },
2882 _ => Ok((
2883 Some(group_expr),
2884 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885 )),
2886 }
2887}
2888
2889pub(crate) fn plan_order_by_exprs(
2897 ecx: &ExprContext,
2898 order_by_exprs: &[OrderByExpr<Aug>],
2899 output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901 let mut order_by = vec![];
2902 let mut map_exprs = vec![];
2903 for obe in order_by_exprs {
2904 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905 let column = match expr {
2908 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909 _ => {
2910 map_exprs.push(expr);
2911 ecx.relation_type.arity() + map_exprs.len() - 1
2912 }
2913 };
2914 order_by.push(resolve_desc_and_nulls_last(obe, column));
2915 }
2916 Ok((order_by, map_exprs))
2917}
2918
2919fn plan_order_by_or_distinct_expr(
2937 ecx: &ExprContext,
2938 expr: &Expr<Aug>,
2939 output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942 return Ok(HirScalarExpr::column(output_columns[i].0));
2943 }
2944
2945 if let Expr::Identifier(names) = expr {
2946 if let [name] = &names[..] {
2947 let name = normalize::column_name(name.clone());
2948 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949 if let Some((i, _)) = iter.next() {
2950 match iter.next() {
2951 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955 _ => return Ok(HirScalarExpr::column(*i)),
2956 }
2957 }
2958 }
2959 }
2960
2961 plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965 qcx: &QueryContext,
2966 table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969 for join in &table_with_joins.joins {
2970 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971 expr = new_expr;
2972 scope = new_scope;
2973 }
2974 Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978 qcx: &QueryContext,
2979 table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981 match table_factor {
2982 TableFactor::Table { name, alias } => {
2983 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984 let scope = plan_table_alias(scope, alias.as_ref())?;
2985 Ok((expr, scope))
2986 }
2987
2988 TableFactor::Function {
2989 function,
2990 alias,
2991 with_ordinality,
2992 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994 TableFactor::RowsFrom {
2995 functions,
2996 alias,
2997 with_ordinality,
2998 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000 TableFactor::Derived {
3001 lateral,
3002 subquery,
3003 alias,
3004 } => {
3005 let mut qcx = (*qcx).clone();
3006 if !lateral {
3007 for scope in &mut qcx.outer_scopes {
3011 if scope.lateral_barrier {
3012 break;
3013 }
3014 scope.items.clear();
3015 }
3016 }
3017 qcx.outer_scopes[0].lateral_barrier = true;
3018 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019 let scope = plan_table_alias(scope, alias.as_ref())?;
3020 Ok((expr, scope))
3021 }
3022
3023 TableFactor::NestedJoin { join, alias } => {
3024 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025 let scope = plan_table_alias(scope, alias.as_ref())?;
3026 Ok((expr, scope))
3027 }
3028 }
3029}
3030
3031fn plan_rows_from(
3073 qcx: &QueryContext,
3074 functions: &[Function<Aug>],
3075 alias: Option<&TableAlias>,
3076 with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078 if functions.iter().any(is_repeat_row) {
3080 bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3084 }
3085
3086 if let [function] = functions {
3089 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3090 }
3091
3092 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3096 qcx,
3097 functions,
3098 Some(functions[0].name.full_item_name().clone()),
3099 )?;
3100
3101 let mut columns = Vec::new();
3103 let mut offset = 0;
3104 for (idx, cols) in num_cols.into_iter().enumerate() {
3106 for i in 0..cols {
3107 columns.push(offset + i);
3108 }
3109 offset += cols + 1;
3110
3111 scope.items.remove(offset - idx - 1);
3114 }
3115
3116 if with_ordinality {
3119 columns.push(offset);
3120 } else {
3121 scope.items.pop();
3122 }
3123
3124 let expr = expr.project(columns);
3125
3126 let scope = plan_table_alias(scope, alias)?;
3127 Ok((expr, scope))
3128}
3129
3130fn is_repeat_row(f: &Function<Aug>) -> bool {
3131 f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3132}
3133
3134fn plan_rows_from_internal<'a>(
3157 qcx: &QueryContext,
3158 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3159 table_name: Option<FullItemName>,
3160) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3161 let mut functions = functions.into_iter();
3162 let mut num_cols = Vec::new();
3163
3164 let (mut left_expr, mut left_scope) =
3168 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3169 num_cols.push(left_scope.len() - 1);
3170 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3172 left_scope
3173 .items
3174 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3175
3176 for function in functions {
3177 let qcx = qcx.empty_derived_context();
3179 let (right_expr, mut right_scope) =
3180 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3181 num_cols.push(right_scope.len() - 1);
3182 let left_col = left_scope.len() - 1;
3183 let right_col = left_scope.len() + right_scope.len() - 1;
3184 let on = HirScalarExpr::call_binary(
3185 HirScalarExpr::column(left_col),
3186 HirScalarExpr::column(right_col),
3187 expr_func::Eq,
3188 );
3189 left_expr = left_expr
3190 .join(right_expr, on, JoinKind::FullOuter)
3191 .map(vec![HirScalarExpr::call_variadic(
3192 Coalesce,
3193 vec![
3194 HirScalarExpr::column(left_col),
3195 HirScalarExpr::column(right_col),
3196 ],
3197 )]);
3198
3199 left_expr = left_expr.project(
3202 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3205 );
3206 right_scope.items.push(left_scope.items.pop().unwrap());
3208
3209 left_scope.items.extend(right_scope.items);
3210 }
3211
3212 Ok((left_expr, left_scope, num_cols))
3213}
3214
3215fn plan_solitary_table_function(
3219 qcx: &QueryContext,
3220 function: &Function<Aug>,
3221 alias: Option<&TableAlias>,
3222 with_ordinality: bool,
3223) -> Result<(HirRelationExpr, Scope), PlanError> {
3224 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3225
3226 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3227 if single_column_function {
3228 let item = &mut scope.items[0];
3229
3230 item.from_single_column_function = true;
3233
3234 if let Some(alias) = alias {
3249 if let ScopeItem {
3250 table_name: Some(table_name),
3251 column_name,
3252 ..
3253 } = item
3254 {
3255 if table_name.item.as_str() == column_name.as_str() {
3256 *column_name = normalize::column_name(alias.name.clone());
3257 }
3258 }
3259 }
3260 }
3261
3262 let scope = plan_table_alias(scope, alias)?;
3263 Ok((expr, scope))
3264}
3265
3266fn plan_table_function_internal(
3271 qcx: &QueryContext,
3272 Function {
3273 name,
3274 args,
3275 filter,
3276 over,
3277 distinct,
3278 }: &Function<Aug>,
3279 with_ordinality: bool,
3280 table_name: Option<FullItemName>,
3281) -> Result<(HirRelationExpr, Scope), PlanError> {
3282 assert_none!(filter, "cannot parse table function with FILTER");
3283 assert_none!(over, "cannot parse table function with OVER");
3284 assert!(!*distinct, "cannot parse table function with DISTINCT");
3285
3286 let ecx = &ExprContext {
3287 qcx,
3288 name: "table function arguments",
3289 scope: &Scope::empty(),
3290 relation_type: &SqlRelationType::empty(),
3291 allow_aggregates: false,
3292 allow_subqueries: true,
3293 allow_parameters: true,
3294 allow_windows: false,
3295 };
3296
3297 let scalar_args = match args {
3298 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3299 FunctionArgs::Args { args, order_by } => {
3300 if !order_by.is_empty() {
3301 sql_bail!(
3302 "ORDER BY specified, but {} is not an aggregate function",
3303 name
3304 );
3305 }
3306 plan_exprs(ecx, args)?
3307 }
3308 };
3309
3310 let table_name = match table_name {
3311 Some(table_name) => table_name.item,
3312 None => name.full_item_name().item.clone(),
3313 };
3314
3315 let scope_name = Some(PartialItemName {
3316 database: None,
3317 schema: None,
3318 item: table_name,
3319 });
3320
3321 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3322 Func::Table(impls) => {
3323 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3324 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3325 let expr = match tf.imp {
3326 TableFuncImpl::CallTable { mut func, exprs } => {
3327 if with_ordinality {
3328 func = TableFunc::with_ordinality(func.clone()).ok_or(
3329 PlanError::Unsupported {
3330 feature: format!("WITH ORDINALITY on {}", func),
3331 discussion_no: None,
3332 },
3333 )?;
3334 }
3335 HirRelationExpr::CallTable { func, exprs }
3336 }
3337 TableFuncImpl::Expr(expr) => {
3338 if !with_ordinality {
3339 expr
3340 } else {
3341 if qcx
3345 .scx
3346 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3347 {
3348 tracing::error!(
3352 %name,
3353 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3354 );
3355 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3356 func: WindowExprType::Scalar(ScalarWindowExpr {
3357 func: ScalarWindowFunc::RowNumber,
3358 order_by: vec![],
3359 }),
3360 partition_by: vec![],
3361 order_by: vec![],
3362 })])
3363 } else {
3364 bail_unsupported!(format!(
3365 "WITH ORDINALITY or ROWS FROM with {}",
3366 name
3367 ));
3368 }
3369 }
3370 }
3371 };
3372 (expr, scope)
3373 }
3374 Func::Scalar(impls) => {
3375 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3376 let output = expr.typ(
3377 &qcx.outer_relation_types,
3378 &SqlRelationType::new(vec![]),
3379 &qcx.scx.param_types.borrow(),
3380 );
3381
3382 let relation = SqlRelationType::new(vec![output]);
3383
3384 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3385 let column_name = normalize::column_name(function_ident);
3386 let name = column_name.to_string();
3387
3388 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3389
3390 let mut func = TableFunc::TabletizedScalar { relation, name };
3391 if with_ordinality {
3392 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3393 feature: format!("WITH ORDINALITY on {}", func),
3394 discussion_no: None,
3395 })?;
3396 }
3397 (
3398 HirRelationExpr::CallTable {
3399 func,
3400 exprs: vec![expr],
3401 },
3402 scope,
3403 )
3404 }
3405 o => sql_bail!(
3406 "{} functions are not supported in functions in FROM",
3407 o.class()
3408 ),
3409 };
3410
3411 if with_ordinality {
3412 scope
3413 .items
3414 .push(ScopeItem::from_name(scope_name, "ordinality"));
3415 }
3416
3417 Ok((expr, scope))
3418}
3419
3420fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3421 if let Some(TableAlias {
3422 name,
3423 columns,
3424 strict,
3425 }) = alias
3426 {
3427 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3428 sql_bail!(
3429 "{} has {} columns available but {} columns specified",
3430 name,
3431 scope.items.len(),
3432 columns.len()
3433 );
3434 }
3435
3436 let table_name = normalize::ident(name.to_owned());
3437 for (i, item) in scope.items.iter_mut().enumerate() {
3438 item.table_name = if item.allow_unqualified_references {
3439 Some(PartialItemName {
3440 database: None,
3441 schema: None,
3442 item: table_name.clone(),
3443 })
3444 } else {
3445 None
3479 };
3480 item.column_name = columns
3481 .get(i)
3482 .map(|a| normalize::column_name(a.clone()))
3483 .unwrap_or_else(|| item.column_name.clone());
3484 }
3485 }
3486 Ok(scope)
3487}
3488
3489fn invent_column_name(
3493 ecx: &ExprContext,
3494 expr: &Expr<Aug>,
3495 table_func_names: &BTreeMap<String, Ident>,
3496) -> Result<Option<ColumnName>, PlanError> {
3497 #[derive(Debug)]
3504 enum NameQuality {
3505 Low,
3506 High,
3507 }
3508
3509 fn invent(
3510 ecx: &ExprContext,
3511 expr: &Expr<Aug>,
3512 table_func_names: &BTreeMap<String, Ident>,
3513 ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3514 Ok(match expr {
3515 Expr::Identifier(names) => {
3516 if let [name] = names.as_slice() {
3517 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3518 return Ok(Some((
3519 normalize::column_name(table_func_name.clone()),
3520 NameQuality::High,
3521 )));
3522 }
3523 }
3524 names
3525 .last()
3526 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3527 }
3528 Expr::Value(v) => match v {
3529 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3532 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3533 _ => None,
3534 },
3535 Expr::Function(func) => {
3536 let (schema, item) = match &func.name {
3537 ResolvedItemName::Item {
3538 qualifiers,
3539 full_name,
3540 ..
3541 } => (&qualifiers.schema_spec, full_name.item.clone()),
3542 _ => {
3545 bail_internal!("function name did not resolve to an item: {:?}", func.name)
3546 }
3547 };
3548
3549 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3550 || schema
3551 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3552 {
3553 None
3554 } else {
3555 Some((item.into(), NameQuality::High))
3556 }
3557 }
3558 Expr::HomogenizingFunction { function, .. } => Some((
3559 function.to_string().to_lowercase().into(),
3560 NameQuality::High,
3561 )),
3562 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3563 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3564 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3565 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3566 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3567 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3568 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3569 },
3570 Expr::Case { else_result, .. } => {
3571 let inner = match else_result.as_ref() {
3572 Some(else_result) => invent(ecx, else_result, table_func_names)?,
3573 None => None,
3574 };
3575 match inner {
3576 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3577 _ => Some(("case".into(), NameQuality::Low)),
3578 }
3579 }
3580 Expr::FieldAccess { field, .. } => {
3581 Some((normalize::column_name(field.clone()), NameQuality::High))
3582 }
3583 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3584 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3585 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3586 let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3594 else {
3595 return Ok(None);
3596 };
3597 scope
3598 .items
3599 .first()
3600 .map(|name| (name.column_name.clone(), NameQuality::High))
3601 }
3602 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3603 _ => None,
3604 })
3605 }
3606
3607 Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3608}
3609
3610#[derive(Debug)]
3611enum ExpandedSelectItem<'a> {
3612 InputOrdinal(usize),
3613 Expr(Cow<'a, Expr<Aug>>),
3614}
3615
3616impl ExpandedSelectItem<'_> {
3617 fn as_expr(&self) -> Option<&Expr<Aug>> {
3618 match self {
3619 ExpandedSelectItem::InputOrdinal(_) => None,
3620 ExpandedSelectItem::Expr(expr) => Some(expr),
3621 }
3622 }
3623}
3624
3625fn expand_select_item<'a>(
3626 ecx: &ExprContext,
3627 s: &'a SelectItem<Aug>,
3628 table_func_names: &BTreeMap<String, Ident>,
3629) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3630 match s {
3631 SelectItem::Expr {
3632 expr: Expr::QualifiedWildcard(table_name),
3633 alias: _,
3634 } => {
3635 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636 let table_name =
3637 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3638 let out: Vec<_> = ecx
3639 .scope
3640 .items
3641 .iter()
3642 .enumerate()
3643 .filter(|(_i, item)| item.is_from_table(&table_name))
3644 .map(|(i, item)| {
3645 let name = item.column_name.clone();
3646 (ExpandedSelectItem::InputOrdinal(i), name)
3647 })
3648 .collect();
3649 if out.is_empty() {
3650 sql_bail!("no table named '{}' in scope", table_name);
3651 }
3652 Ok(out)
3653 }
3654 SelectItem::Expr {
3655 expr: Expr::WildcardAccess(sql_expr),
3656 alias: _,
3657 } => {
3658 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3659 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3665 let fields = match ecx.scalar_type(&expr) {
3666 SqlScalarType::Record { fields, .. } => fields,
3667 ty => sql_bail!(
3668 "type {} is not composite",
3669 ecx.humanize_sql_scalar_type(&ty, false)
3670 ),
3671 };
3672 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3673 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3674 if let [name] = ident.as_slice() {
3675 if let Ok(items) = ecx.scope.items_from_table(
3676 &[],
3677 &PartialItemName {
3678 database: None,
3679 schema: None,
3680 item: name.as_str().to_string(),
3681 },
3682 ) {
3683 for (_, item) in items {
3684 if item
3685 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3686 {
3687 skip_cols.insert(item.column_name.clone());
3688 }
3689 }
3690 }
3691 }
3692 }
3693 let items = fields
3694 .iter()
3695 .filter_map(|(name, _ty)| {
3696 if skip_cols.contains(name) {
3697 None
3698 } else {
3699 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3700 expr: sql_expr.clone(),
3701 field: name.clone().into(),
3702 }));
3703 Some((item, name.clone()))
3704 }
3705 })
3706 .collect();
3707 Ok(items)
3708 }
3709 SelectItem::Wildcard => {
3710 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3711 let items: Vec<_> = ecx
3712 .scope
3713 .items
3714 .iter()
3715 .enumerate()
3716 .filter(|(_i, item)| item.allow_unqualified_references)
3717 .map(|(i, item)| {
3718 let name = item.column_name.clone();
3719 (ExpandedSelectItem::InputOrdinal(i), name)
3720 })
3721 .collect();
3722
3723 Ok(items)
3724 }
3725 SelectItem::Expr { expr, alias } => {
3726 let name = match alias.clone().map(normalize::column_name) {
3727 Some(name) => name,
3728 None => invent_column_name(ecx, expr, table_func_names)?
3729 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3730 };
3731 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3732 }
3733 }
3734}
3735
3736fn plan_join(
3737 left_qcx: &QueryContext,
3738 left: HirRelationExpr,
3739 left_scope: Scope,
3740 join: &Join<Aug>,
3741) -> Result<(HirRelationExpr, Scope), PlanError> {
3742 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3743 let (kind, constraint) = match &join.join_operator {
3744 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3745 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3746 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3747 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3748 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3749 };
3750
3751 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3752 if !kind.can_be_correlated() {
3753 for item in &mut right_qcx.outer_scopes[0].items {
3754 item.error_if_referenced =
3759 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3760 table: table.cloned(),
3761 column: column.clone(),
3762 });
3763 }
3764 }
3765 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3766
3767 let (expr, scope) = match constraint {
3768 JoinConstraint::On(expr) => {
3769 let product_scope = left_scope.product(right_scope)?;
3770 let ecx = &ExprContext {
3771 qcx: left_qcx,
3772 name: "ON clause",
3773 scope: &product_scope,
3774 relation_type: &SqlRelationType::new(
3775 left_qcx
3776 .relation_type(&left)
3777 .column_types
3778 .into_iter()
3779 .chain(right_qcx.relation_type(&right).column_types)
3780 .collect(),
3781 ),
3782 allow_aggregates: false,
3783 allow_subqueries: true,
3784 allow_parameters: true,
3785 allow_windows: false,
3786 };
3787 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3788 let joined = left.join(right, on, kind);
3789 (joined, product_scope)
3790 }
3791 JoinConstraint::Using { columns, alias } => {
3792 let column_names = columns
3793 .iter()
3794 .map(|ident| normalize::column_name(ident.clone()))
3795 .collect::<Vec<_>>();
3796
3797 plan_using_constraint(
3798 &column_names,
3799 left_qcx,
3800 left,
3801 left_scope,
3802 &right_qcx,
3803 right,
3804 right_scope,
3805 kind,
3806 alias.as_ref(),
3807 )?
3808 }
3809 JoinConstraint::Natural => {
3810 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3813 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3814 let left_column_names = left_scope.column_names();
3815 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3816 let column_names: Vec<_> = left_column_names
3817 .filter(|col| right_column_names.contains(col))
3818 .cloned()
3819 .collect();
3820 plan_using_constraint(
3821 &column_names,
3822 left_qcx,
3823 left,
3824 left_scope,
3825 &right_qcx,
3826 right,
3827 right_scope,
3828 kind,
3829 None,
3830 )?
3831 }
3832 };
3833 Ok((expr, scope))
3834}
3835
3836#[allow(clippy::too_many_arguments)]
3838fn plan_using_constraint(
3839 column_names: &[ColumnName],
3840 left_qcx: &QueryContext,
3841 left: HirRelationExpr,
3842 left_scope: Scope,
3843 right_qcx: &QueryContext,
3844 right: HirRelationExpr,
3845 right_scope: Scope,
3846 kind: JoinKind,
3847 alias: Option<&Ident>,
3848) -> Result<(HirRelationExpr, Scope), PlanError> {
3849 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3850
3851 let mut unique_column_names = BTreeSet::new();
3854 for c in column_names {
3855 if !unique_column_names.insert(c) {
3856 return Err(PlanError::Unsupported {
3857 feature: format!(
3858 "column name {} appears more than once in USING clause",
3859 c.quoted()
3860 ),
3861 discussion_no: None,
3862 });
3863 }
3864 }
3865
3866 let alias_item_name = alias.map(|alias| PartialItemName {
3867 database: None,
3868 schema: None,
3869 item: alias.clone().to_string(),
3870 });
3871
3872 if let Some(alias_item_name) = &alias_item_name {
3873 for partial_item_name in both_scope.table_names() {
3874 if partial_item_name.matches(alias_item_name) {
3875 sql_bail!(
3876 "table name \"{}\" specified more than once",
3877 alias_item_name
3878 )
3879 }
3880 }
3881 }
3882
3883 let ecx = &ExprContext {
3884 qcx: right_qcx,
3885 name: "USING clause",
3886 scope: &both_scope,
3887 relation_type: &SqlRelationType::new(
3888 left_qcx
3889 .relation_type(&left)
3890 .column_types
3891 .into_iter()
3892 .chain(right_qcx.relation_type(&right).column_types)
3893 .collect(),
3894 ),
3895 allow_aggregates: false,
3896 allow_subqueries: false,
3897 allow_parameters: false,
3898 allow_windows: false,
3899 };
3900
3901 let mut join_exprs = vec![];
3902 let mut map_exprs = vec![];
3903 let mut new_items = vec![];
3904 let mut join_cols = vec![];
3905 let mut hidden_cols = vec![];
3906
3907 for column_name in column_names {
3908 let (lhs, lhs_name) = left_scope.resolve_using_column(
3910 column_name,
3911 JoinSide::Left,
3912 &mut left_qcx.name_manager.borrow_mut(),
3913 )?;
3914 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3915 column_name,
3916 JoinSide::Right,
3917 &mut right_qcx.name_manager.borrow_mut(),
3918 )?;
3919
3920 rhs.column += left_scope.len();
3922
3923 let mut exprs = coerce_homogeneous_exprs(
3925 &ecx.with_name(&format!(
3926 "NATURAL/USING join column {}",
3927 column_name.quoted()
3928 )),
3929 vec![
3930 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3931 lhs,
3932 Arc::clone(&lhs_name),
3933 )),
3934 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3935 rhs,
3936 Arc::clone(&rhs_name),
3937 )),
3938 ],
3939 None,
3940 )?;
3941 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3942
3943 match kind {
3944 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3945 join_cols.push(lhs.column);
3946 hidden_cols.push(rhs.column);
3947 }
3948 JoinKind::RightOuter => {
3949 join_cols.push(rhs.column);
3950 hidden_cols.push(lhs.column);
3951 }
3952 JoinKind::FullOuter => {
3953 join_cols.push(both_scope.items.len() + map_exprs.len());
3956 hidden_cols.push(lhs.column);
3957 hidden_cols.push(rhs.column);
3958 map_exprs.push(HirScalarExpr::call_variadic(
3959 Coalesce,
3960 vec![expr1.clone(), expr2.clone()],
3961 ));
3962 new_items.push(ScopeItem::from_column_name(column_name));
3963 }
3964 }
3965
3966 if alias_item_name.is_some() {
3971 let new_item_col = both_scope.items.len() + new_items.len();
3972 join_cols.push(new_item_col);
3973 hidden_cols.push(new_item_col);
3974
3975 new_items.push(ScopeItem::from_name(
3976 alias_item_name.clone(),
3977 column_name.clone().to_string(),
3978 ));
3979
3980 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3984 }
3985
3986 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3987 }
3988 both_scope.items.extend(new_items);
3989
3990 for c in hidden_cols {
3994 both_scope.items[c].allow_unqualified_references = false;
3995 }
3996
3997 let project_key = join_cols
3999 .into_iter()
4000 .chain(0..both_scope.items.len())
4001 .unique()
4002 .collect::<Vec<_>>();
4003
4004 both_scope = both_scope.project(&project_key);
4005
4006 let on = HirScalarExpr::variadic_and(join_exprs);
4007
4008 let both = left
4009 .join(right, on, kind)
4010 .map(map_exprs)
4011 .project(project_key);
4012 Ok((both, both_scope))
4013}
4014
4015pub fn plan_expr<'a>(
4016 ecx: &'a ExprContext,
4017 e: &Expr<Aug>,
4018) -> Result<CoercibleScalarExpr, PlanError> {
4019 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4020}
4021
4022fn plan_expr_inner<'a>(
4023 ecx: &'a ExprContext,
4024 e: &Expr<Aug>,
4025) -> Result<CoercibleScalarExpr, PlanError> {
4026 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4027 return Ok(HirScalarExpr::named_column(
4029 i,
4030 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4031 )
4032 .into());
4033 }
4034
4035 match e {
4036 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4038 Ok(plan_identifier(ecx, names)?.into())
4039 }
4040
4041 Expr::Value(val) => plan_literal(val),
4043 Expr::Parameter(n) => plan_parameter(ecx, *n),
4044 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4045 Expr::List(exprs) => plan_list(ecx, exprs, None),
4046 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4047 Expr::Row { exprs } => plan_row(ecx, exprs),
4048
4049 Expr::Op { op, expr1, expr2 } => {
4051 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4052 }
4053 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4054 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4055
4056 Expr::Not { expr } => plan_not(ecx, expr),
4058 Expr::And { left, right } => plan_and(ecx, left, right),
4059 Expr::Or { left, right } => plan_or(ecx, left, right),
4060 Expr::IsExpr {
4061 expr,
4062 construct,
4063 negated,
4064 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4065 Expr::Case {
4066 operand,
4067 conditions,
4068 results,
4069 else_result,
4070 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4071 Expr::HomogenizingFunction { function, exprs } => {
4072 plan_homogenizing_function(ecx, function, exprs)
4073 }
4074 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4075 ecx,
4076 &None,
4077 &[l_expr.clone().equals(*r_expr.clone())],
4078 &[Expr::null()],
4079 &Some(Box::new(*l_expr.clone())),
4080 )?
4081 .into()),
4082 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4083 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4084 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4085 Expr::Like {
4086 expr,
4087 pattern,
4088 escape,
4089 case_insensitive,
4090 negated,
4091 } => Ok(plan_like(
4092 ecx,
4093 expr,
4094 pattern,
4095 escape.as_deref(),
4096 *case_insensitive,
4097 *negated,
4098 )?
4099 .into()),
4100
4101 Expr::InList {
4102 expr,
4103 list,
4104 negated,
4105 } => plan_in_list(ecx, expr, list, negated),
4106
4107 Expr::Exists(query) => plan_exists(ecx, query),
4109 Expr::Subquery(query) => plan_subquery(ecx, query),
4110 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4111 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4112 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4113 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4114 Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4115 Expr::InSubquery { .. } => {
4116 bail_internal!("Expr::InSubquery should have been desugared")
4117 }
4118 Expr::AnyExpr { .. } => {
4119 bail_internal!("Expr::AnyExpr should have been desugared")
4120 }
4121 Expr::AllExpr { .. } => {
4122 bail_internal!("Expr::AllExpr should have been desugared")
4123 }
4124 Expr::AnySubquery { .. } => {
4125 bail_internal!("Expr::AnySubquery should have been desugared")
4126 }
4127 Expr::AllSubquery { .. } => {
4128 bail_internal!("Expr::AllSubquery should have been desugared")
4129 }
4130 Expr::Between { .. } => {
4131 bail_internal!("Expr::Between should have been desugared")
4132 }
4133 }
4134}
4135
4136fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4137 if !ecx.allow_parameters {
4138 return Err(PlanError::UnknownParameter(n));
4142 }
4143 if n == 0 || n > 65536 {
4144 return Err(PlanError::UnknownParameter(n));
4145 }
4146 if ecx.param_types().borrow().contains_key(&n) {
4147 Ok(HirScalarExpr::parameter(n).into())
4148 } else {
4149 Ok(CoercibleScalarExpr::Parameter(n))
4150 }
4151}
4152
4153fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4154 let mut out = vec![];
4155 for e in exprs {
4156 out.push(plan_expr(ecx, e)?);
4157 }
4158 Ok(CoercibleScalarExpr::LiteralRecord(out))
4159}
4160
4161fn plan_cast(
4162 ecx: &ExprContext,
4163 expr: &Expr<Aug>,
4164 data_type: &ResolvedDataType,
4165) -> Result<CoercibleScalarExpr, PlanError> {
4166 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4167 let expr = match expr {
4168 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4177 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4178 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4179 _ => plan_expr(ecx, expr)?,
4180 };
4181 let ecx = &ecx.with_name("CAST");
4182 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4183 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4184 Ok(expr.into())
4185}
4186
4187fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4188 let ecx = ecx.with_name("NOT argument");
4189 Ok(plan_expr(&ecx, expr)?
4190 .type_as(&ecx, &SqlScalarType::Bool)?
4191 .call_unary(UnaryFunc::Not(expr_func::Not))
4192 .into())
4193}
4194
4195fn plan_and(
4196 ecx: &ExprContext,
4197 left: &Expr<Aug>,
4198 right: &Expr<Aug>,
4199) -> Result<CoercibleScalarExpr, PlanError> {
4200 let ecx = ecx.with_name("AND argument");
4201 Ok(HirScalarExpr::variadic_and(vec![
4202 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4203 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4204 ])
4205 .into())
4206}
4207
4208fn plan_or(
4209 ecx: &ExprContext,
4210 left: &Expr<Aug>,
4211 right: &Expr<Aug>,
4212) -> Result<CoercibleScalarExpr, PlanError> {
4213 let ecx = ecx.with_name("OR argument");
4214 Ok(HirScalarExpr::variadic_or(vec![
4215 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4216 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217 ])
4218 .into())
4219}
4220
4221fn plan_in_list(
4222 ecx: &ExprContext,
4223 lhs: &Expr<Aug>,
4224 list: &Vec<Expr<Aug>>,
4225 negated: &bool,
4226) -> Result<CoercibleScalarExpr, PlanError> {
4227 let ecx = ecx.with_name("IN list");
4228 let or = HirScalarExpr::variadic_or(
4229 list.into_iter()
4230 .map(|e| {
4231 let eq = lhs.clone().equals(e.clone());
4232 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4233 })
4234 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4235 );
4236 Ok(if *negated {
4237 or.call_unary(UnaryFunc::Not(expr_func::Not))
4238 } else {
4239 or
4240 }
4241 .into())
4242}
4243
4244fn plan_homogenizing_function(
4245 ecx: &ExprContext,
4246 function: &HomogenizingFunction,
4247 exprs: &[Expr<Aug>],
4248) -> Result<CoercibleScalarExpr, PlanError> {
4249 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4251 match function {
4252 HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4253 HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4254 HomogenizingFunction::Least => VariadicFunc::from(Least),
4255 },
4256 coerce_homogeneous_exprs(
4257 &ecx.with_name(&function.to_string().to_lowercase()),
4258 plan_exprs(ecx, exprs)?,
4259 None,
4260 )?,
4261 );
4262 Ok(expr.into())
4263}
4264
4265fn plan_field_access(
4266 ecx: &ExprContext,
4267 expr: &Expr<Aug>,
4268 field: &Ident,
4269) -> Result<CoercibleScalarExpr, PlanError> {
4270 let field = normalize::column_name(field.clone());
4271 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4272 let ty = ecx.scalar_type(&expr);
4273 let i = match &ty {
4274 SqlScalarType::Record { fields, .. } => {
4275 fields.iter().position(|(name, _ty)| *name == field)
4276 }
4277 ty => sql_bail!(
4278 "column notation applied to type {}, which is not a composite type",
4279 ecx.humanize_sql_scalar_type(ty, false)
4280 ),
4281 };
4282 match i {
4283 None => sql_bail!(
4284 "field {} not found in data type {}",
4285 field,
4286 ecx.humanize_sql_scalar_type(&ty, false)
4287 ),
4288 Some(i) => Ok(expr
4289 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4290 .into()),
4291 }
4292}
4293
4294fn plan_subscript(
4295 ecx: &ExprContext,
4296 expr: &Expr<Aug>,
4297 positions: &[SubscriptPosition<Aug>],
4298) -> Result<CoercibleScalarExpr, PlanError> {
4299 assert!(
4300 !positions.is_empty(),
4301 "subscript expression must contain at least one position"
4302 );
4303
4304 let ecx = &ecx.with_name("subscripting");
4305 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4306 let ty = ecx.scalar_type(&expr);
4307 match &ty {
4308 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4309 ecx,
4310 expr,
4311 positions,
4312 if ty == SqlScalarType::Int2Vector {
4316 1
4317 } else {
4318 0
4319 },
4320 ),
4321 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4322 SqlScalarType::List { element_type, .. } => {
4323 let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4325 let n_layers = ty.unwrap_list_n_layers();
4326 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4327 }
4328 ty => sql_bail!(
4329 "cannot subscript type {}",
4330 ecx.humanize_sql_scalar_type(ty, false)
4331 ),
4332 }
4333}
4334
4335fn extract_scalar_subscript_from_positions<'a>(
4339 positions: &'a [SubscriptPosition<Aug>],
4340 expr_type_name: &str,
4341) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4342 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4343 for p in positions {
4344 if p.explicit_slice {
4345 sql_bail!("{} subscript does not support slices", expr_type_name);
4346 }
4347 assert!(
4348 p.end.is_none(),
4349 "index-appearing subscripts cannot have end value"
4350 );
4351 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4352 }
4353 Ok(scalar_subscripts)
4354}
4355
4356fn plan_subscript_array(
4357 ecx: &ExprContext,
4358 expr: HirScalarExpr,
4359 positions: &[SubscriptPosition<Aug>],
4360 offset: i64,
4361) -> Result<CoercibleScalarExpr, PlanError> {
4362 let mut exprs = Vec::with_capacity(positions.len() + 1);
4363 exprs.push(expr);
4364
4365 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4368
4369 for i in indexes {
4370 exprs.push(plan_expr(ecx, i)?.cast_to(
4371 ecx,
4372 CastContext::Explicit,
4373 &SqlScalarType::Int64,
4374 )?);
4375 }
4376
4377 Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4378}
4379
4380fn plan_subscript_list(
4381 ecx: &ExprContext,
4382 mut expr: HirScalarExpr,
4383 positions: &[SubscriptPosition<Aug>],
4384 mut remaining_layers: usize,
4385 elem_type_name: &str,
4386) -> Result<CoercibleScalarExpr, PlanError> {
4387 let mut i = 0;
4388
4389 while i < positions.len() {
4390 let j = positions[i..]
4392 .iter()
4393 .position(|p| p.explicit_slice)
4394 .unwrap_or(positions.len() - i);
4395 if j != 0 {
4396 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4397 let (n, e) = plan_index_list(
4398 ecx,
4399 expr,
4400 indexes.as_slice(),
4401 remaining_layers,
4402 elem_type_name,
4403 )?;
4404 remaining_layers = n;
4405 expr = e;
4406 i += j;
4407 }
4408
4409 let j = positions[i..]
4411 .iter()
4412 .position(|p| !p.explicit_slice)
4413 .unwrap_or(positions.len() - i);
4414 if j != 0 {
4415 expr = plan_slice_list(
4416 ecx,
4417 expr,
4418 &positions[i..i + j],
4419 remaining_layers,
4420 elem_type_name,
4421 )?;
4422 i += j;
4423 }
4424 }
4425
4426 Ok(expr.into())
4427}
4428
4429fn plan_index_list(
4430 ecx: &ExprContext,
4431 expr: HirScalarExpr,
4432 indexes: &[&Expr<Aug>],
4433 n_layers: usize,
4434 elem_type_name: &str,
4435) -> Result<(usize, HirScalarExpr), PlanError> {
4436 let depth = indexes.len();
4437
4438 if depth > n_layers {
4439 if n_layers == 0 {
4440 sql_bail!("cannot subscript type {}", elem_type_name)
4441 } else {
4442 sql_bail!(
4443 "cannot index into {} layers; list only has {} layer{}",
4444 depth,
4445 n_layers,
4446 if n_layers == 1 { "" } else { "s" }
4447 )
4448 }
4449 }
4450
4451 let mut exprs = Vec::with_capacity(depth + 1);
4452 exprs.push(expr);
4453
4454 for i in indexes {
4455 exprs.push(plan_expr(ecx, i)?.cast_to(
4456 ecx,
4457 CastContext::Explicit,
4458 &SqlScalarType::Int64,
4459 )?);
4460 }
4461
4462 Ok((
4463 n_layers - depth,
4464 HirScalarExpr::call_variadic(ListIndex, exprs),
4465 ))
4466}
4467
4468fn plan_slice_list(
4469 ecx: &ExprContext,
4470 expr: HirScalarExpr,
4471 slices: &[SubscriptPosition<Aug>],
4472 n_layers: usize,
4473 elem_type_name: &str,
4474) -> Result<HirScalarExpr, PlanError> {
4475 if n_layers == 0 {
4476 sql_bail!("cannot subscript type {}", elem_type_name)
4477 }
4478
4479 let mut exprs = Vec::with_capacity(slices.len() + 1);
4481 exprs.push(expr);
4482 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4484 Ok(match position {
4485 Some(p) => {
4486 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4487 }
4488 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4489 })
4490 };
4491 for p in slices {
4492 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4493 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4494 exprs.push(start);
4495 exprs.push(end);
4496 }
4497
4498 Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4499}
4500
4501fn plan_like(
4502 ecx: &ExprContext,
4503 expr: &Expr<Aug>,
4504 pattern: &Expr<Aug>,
4505 escape: Option<&Expr<Aug>>,
4506 case_insensitive: bool,
4507 not: bool,
4508) -> Result<HirScalarExpr, PlanError> {
4509 use CastContext::Implicit;
4510 let ecx = ecx.with_name("LIKE argument");
4511 let expr = plan_expr(&ecx, expr)?;
4512 let haystack = match ecx.scalar_type(&expr) {
4513 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4514 .type_as(&ecx, ty)?
4515 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4516 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4517 };
4518 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4519 if let Some(escape) = escape {
4520 pattern = pattern.call_binary(
4521 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4522 expr_func::LikeEscape,
4523 );
4524 }
4525 let func: BinaryFunc = if case_insensitive {
4526 expr_func::IsLikeMatchCaseInsensitive.into()
4527 } else {
4528 expr_func::IsLikeMatchCaseSensitive.into()
4529 };
4530 let like = haystack.call_binary(pattern, func);
4531 if not {
4532 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4533 } else {
4534 Ok(like)
4535 }
4536}
4537
4538fn plan_subscript_jsonb(
4539 ecx: &ExprContext,
4540 expr: HirScalarExpr,
4541 positions: &[SubscriptPosition<Aug>],
4542) -> Result<CoercibleScalarExpr, PlanError> {
4543 use CastContext::Implicit;
4544 use SqlScalarType::{Int64, String};
4545
4546 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4549
4550 let mut exprs = Vec::with_capacity(subscripts.len());
4551 for s in subscripts {
4552 let subscript = plan_expr(ecx, s)?;
4553 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4554 subscript
4555 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4556 typeconv::to_string(ecx, subscript)
4560 } else {
4561 sql_bail!("jsonb subscript type must be coercible to integer or text");
4562 };
4563 exprs.push(subscript);
4564 }
4565
4566 let expr = expr.call_binary(
4569 HirScalarExpr::call_variadic(
4570 ArrayCreate {
4571 elem_type: SqlScalarType::String,
4572 },
4573 exprs,
4574 ),
4575 expr_func::JsonbGetPath,
4576 );
4577 Ok(expr.into())
4578}
4579
4580fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4581 if !ecx.allow_subqueries {
4582 sql_bail!("{} does not allow subqueries", ecx.name)
4583 }
4584 let mut qcx = ecx.derived_query_context();
4585 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4586 Ok(expr.exists().into())
4587}
4588
4589fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4590 if !ecx.allow_subqueries {
4591 sql_bail!("{} does not allow subqueries", ecx.name)
4592 }
4593 let mut qcx = ecx.derived_query_context();
4594 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4595 let column_types = qcx.relation_type(&expr).column_types;
4596 if column_types.len() != 1 {
4597 sql_bail!(
4598 "Expected subselect to return 1 column, got {} columns",
4599 column_types.len()
4600 );
4601 }
4602 Ok(expr.select().into())
4603}
4604
4605fn plan_list_subquery(
4606 ecx: &ExprContext,
4607 query: &Query<Aug>,
4608) -> Result<CoercibleScalarExpr, PlanError> {
4609 plan_vector_like_subquery(
4610 ecx,
4611 query,
4612 |_| false,
4613 |elem_type| ListCreate { elem_type }.into(),
4614 |order_by| AggregateFunc::ListConcat { order_by },
4615 expr_func::ListListConcat.into(),
4616 |elem_type| {
4617 HirScalarExpr::literal(
4618 Datum::empty_list(),
4619 SqlScalarType::List {
4620 element_type: Box::new(elem_type),
4621 custom_id: None,
4622 },
4623 )
4624 },
4625 "list",
4626 )
4627}
4628
4629fn plan_array_subquery(
4630 ecx: &ExprContext,
4631 query: &Query<Aug>,
4632) -> Result<CoercibleScalarExpr, PlanError> {
4633 plan_vector_like_subquery(
4634 ecx,
4635 query,
4636 |elem_type| {
4637 matches!(
4638 elem_type,
4639 SqlScalarType::Char { .. }
4640 | SqlScalarType::Array { .. }
4641 | SqlScalarType::List { .. }
4642 | SqlScalarType::Map { .. }
4643 )
4644 },
4645 |elem_type| ArrayCreate { elem_type }.into(),
4646 |order_by| AggregateFunc::ArrayConcat { order_by },
4647 expr_func::ArrayArrayConcat.into(),
4648 |elem_type| {
4649 HirScalarExpr::literal(
4650 Datum::empty_array(),
4651 SqlScalarType::Array(Box::new(elem_type)),
4652 )
4653 },
4654 "[]",
4655 )
4656}
4657
4658fn plan_vector_like_subquery<F1, F2, F3, F4>(
4660 ecx: &ExprContext,
4661 query: &Query<Aug>,
4662 is_unsupported_type: F1,
4663 vector_create: F2,
4664 aggregate_concat: F3,
4665 binary_concat: BinaryFunc,
4666 empty_literal: F4,
4667 vector_type_string: &str,
4668) -> Result<CoercibleScalarExpr, PlanError>
4669where
4670 F1: Fn(&SqlScalarType) -> bool,
4671 F2: Fn(SqlScalarType) -> VariadicFunc,
4672 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4673 F4: Fn(SqlScalarType) -> HirScalarExpr,
4674{
4675 if !ecx.allow_subqueries {
4676 sql_bail!("{} does not allow subqueries", ecx.name)
4677 }
4678
4679 let mut qcx = ecx.derived_query_context();
4680 let mut planned_query = plan_query(&mut qcx, query)?;
4681 if planned_query.limit.is_some()
4682 || !planned_query
4683 .offset
4684 .clone()
4685 .try_into_literal_int64()
4686 .is_ok_and(|offset| offset == 0)
4687 {
4688 planned_query.expr = HirRelationExpr::top_k(
4689 planned_query.expr,
4690 vec![],
4691 planned_query.order_by.clone(),
4692 planned_query.limit,
4693 planned_query.offset,
4694 planned_query.group_size_hints.limit_input_group_size,
4695 );
4696 }
4697
4698 if planned_query.project.len() != 1 {
4699 sql_bail!(
4700 "Expected subselect to return 1 column, got {} columns",
4701 planned_query.project.len()
4702 );
4703 }
4704
4705 let project_column = *planned_query.project.get(0).unwrap();
4706 let elem_type = qcx
4707 .relation_type(&planned_query.expr)
4708 .column_types
4709 .get(project_column)
4710 .cloned()
4711 .unwrap()
4712 .scalar_type();
4713
4714 if is_unsupported_type(&elem_type) {
4715 bail_unsupported!(format!(
4716 "cannot build array from subquery because return type {}{}",
4717 ecx.humanize_sql_scalar_type(&elem_type, false),
4718 vector_type_string
4719 ));
4720 }
4721
4722 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4725 vector_create(elem_type.clone()),
4726 vec![HirScalarExpr::column(project_column)],
4727 ))
4728 .chain(
4729 planned_query
4730 .order_by
4731 .iter()
4732 .map(|co| HirScalarExpr::column(co.column)),
4733 )
4734 .collect();
4735
4736 let aggregation_projection = vec![0];
4740 let aggregation_order_by = planned_query
4741 .order_by
4742 .into_iter()
4743 .enumerate()
4744 .map(|(i, order)| ColumnOrder { column: i, ..order })
4745 .collect();
4746
4747 let reduced_expr = planned_query
4748 .expr
4749 .reduce(
4750 vec![],
4751 vec![AggregateExpr {
4752 func: aggregate_concat(aggregation_order_by),
4753 expr: Box::new(HirScalarExpr::call_variadic(
4754 RecordCreate {
4755 field_names: iter::repeat(ColumnName::from(""))
4756 .take(aggregation_exprs.len())
4757 .collect(),
4758 },
4759 aggregation_exprs,
4760 )),
4761 distinct: false,
4762 }],
4763 None,
4764 )
4765 .project(aggregation_projection);
4766
4767 Ok(reduced_expr
4769 .select()
4770 .call_binary(empty_literal(elem_type), binary_concat)
4771 .into())
4772}
4773
4774fn plan_map_subquery(
4775 ecx: &ExprContext,
4776 query: &Query<Aug>,
4777) -> Result<CoercibleScalarExpr, PlanError> {
4778 if !ecx.allow_subqueries {
4779 sql_bail!("{} does not allow subqueries", ecx.name)
4780 }
4781
4782 let mut qcx = ecx.derived_query_context();
4783 let mut query = plan_query(&mut qcx, query)?;
4784 if query.limit.is_some()
4785 || !query
4786 .offset
4787 .clone()
4788 .try_into_literal_int64()
4789 .is_ok_and(|offset| offset == 0)
4790 {
4791 query.expr = HirRelationExpr::top_k(
4792 query.expr,
4793 vec![],
4794 query.order_by.clone(),
4795 query.limit,
4796 query.offset,
4797 query.group_size_hints.limit_input_group_size,
4798 );
4799 }
4800 if query.project.len() != 2 {
4801 sql_bail!(
4802 "expected map subquery to return 2 columns, got {} columns",
4803 query.project.len()
4804 );
4805 }
4806
4807 let query_types = qcx.relation_type(&query.expr).column_types;
4808 let key_column = query.project[0];
4809 let key_type = query_types[key_column].clone().scalar_type();
4810 let value_column = query.project[1];
4811 let value_type = query_types[value_column].clone().scalar_type();
4812
4813 if key_type != SqlScalarType::String {
4814 sql_bail!("cannot build map from subquery because first column is not of type text");
4815 }
4816
4817 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4818 RecordCreate {
4819 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4820 },
4821 vec![
4822 HirScalarExpr::column(key_column),
4823 HirScalarExpr::column(value_column),
4824 ],
4825 ))
4826 .chain(
4827 query
4828 .order_by
4829 .iter()
4830 .map(|co| HirScalarExpr::column(co.column)),
4831 )
4832 .collect();
4833
4834 let expr = query
4835 .expr
4836 .reduce(
4837 vec![],
4838 vec![AggregateExpr {
4839 func: AggregateFunc::MapAgg {
4840 order_by: query
4841 .order_by
4842 .into_iter()
4843 .enumerate()
4844 .map(|(i, order)| ColumnOrder { column: i, ..order })
4845 .collect(),
4846 value_type: value_type.clone(),
4847 },
4848 expr: Box::new(HirScalarExpr::call_variadic(
4849 RecordCreate {
4850 field_names: iter::repeat(ColumnName::from(""))
4851 .take(aggregation_exprs.len())
4852 .collect(),
4853 },
4854 aggregation_exprs,
4855 )),
4856 distinct: false,
4857 }],
4858 None,
4859 )
4860 .project(vec![0]);
4861
4862 let expr = HirScalarExpr::call_variadic(
4864 Coalesce,
4865 vec![
4866 expr.select(),
4867 HirScalarExpr::literal(
4868 Datum::empty_map(),
4869 SqlScalarType::Map {
4870 value_type: Box::new(value_type),
4871 custom_id: None,
4872 },
4873 ),
4874 ],
4875 );
4876
4877 Ok(expr.into())
4878}
4879
4880fn plan_collate(
4881 ecx: &ExprContext,
4882 expr: &Expr<Aug>,
4883 collation: &UnresolvedItemName,
4884) -> Result<CoercibleScalarExpr, PlanError> {
4885 if collation.0.len() == 2
4886 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4887 && collation.0[1] == ident!("default")
4888 {
4889 plan_expr(ecx, expr)
4890 } else {
4891 bail_unsupported!("COLLATE");
4892 }
4893}
4894
4895fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4902where
4903 E: std::borrow::Borrow<Expr<Aug>>,
4904{
4905 let mut out = vec![];
4906 for expr in exprs {
4907 out.push(plan_expr(ecx, expr.borrow())?);
4908 }
4909 Ok(out)
4910}
4911
4912fn plan_array(
4914 ecx: &ExprContext,
4915 exprs: &[Expr<Aug>],
4916 type_hint: Option<&SqlScalarType>,
4917) -> Result<CoercibleScalarExpr, PlanError> {
4918 let mut out = vec![];
4920 for expr in exprs {
4921 out.push(match expr {
4922 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4925 _ => plan_expr(ecx, expr)?,
4926 });
4927 }
4928
4929 let type_hint = match type_hint {
4931 Some(SqlScalarType::Array(elem_type)) => {
4936 let multidimensional = out.iter().any(|e| {
4937 matches!(
4938 ecx.scalar_type(e),
4939 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4940 )
4941 });
4942 if multidimensional {
4943 type_hint
4944 } else {
4945 Some(&**elem_type)
4946 }
4947 }
4948 Some(_) => None,
4952 None => None,
4954 };
4955
4956 let (elem_type, exprs) = if exprs.is_empty() {
4958 if let Some(elem_type) = type_hint {
4959 (elem_type.clone(), vec![])
4960 } else {
4961 sql_bail!("cannot determine type of empty array");
4962 }
4963 } else {
4964 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4965 (ecx.scalar_type(&out[0]), out)
4966 };
4967
4968 if matches!(
4974 elem_type,
4975 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4976 ) {
4977 bail_unsupported!(format!(
4978 "{}[]",
4979 ecx.humanize_sql_scalar_type(&elem_type, false)
4980 ));
4981 }
4982
4983 Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4984}
4985
4986fn plan_list(
4987 ecx: &ExprContext,
4988 exprs: &[Expr<Aug>],
4989 type_hint: Option<&SqlScalarType>,
4990) -> Result<CoercibleScalarExpr, PlanError> {
4991 let (elem_type, exprs) = if exprs.is_empty() {
4992 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4993 (element_type.without_modifiers(), vec![])
4994 } else {
4995 sql_bail!("cannot determine type of empty list");
4996 }
4997 } else {
4998 let type_hint = match type_hint {
4999 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5000 _ => None,
5001 };
5002
5003 let mut out = vec![];
5004 for expr in exprs {
5005 out.push(match expr {
5006 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5009 _ => plan_expr(ecx, expr)?,
5010 });
5011 }
5012 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5013 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5014 };
5015
5016 if matches!(elem_type, SqlScalarType::Char { .. }) {
5017 bail_unsupported!("char list");
5018 }
5019
5020 Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5021}
5022
5023fn plan_map(
5024 ecx: &ExprContext,
5025 entries: &[MapEntry<Aug>],
5026 type_hint: Option<&SqlScalarType>,
5027) -> Result<CoercibleScalarExpr, PlanError> {
5028 let (value_type, exprs) = if entries.is_empty() {
5029 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5030 (value_type.without_modifiers(), vec![])
5031 } else {
5032 sql_bail!("cannot determine type of empty map");
5033 }
5034 } else {
5035 let type_hint = match type_hint {
5036 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5037 _ => None,
5038 };
5039
5040 let mut keys = vec![];
5041 let mut values = vec![];
5042 for MapEntry { key, value } in entries {
5043 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5044 let value = match value {
5045 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5048 _ => plan_expr(ecx, value)?,
5049 };
5050 keys.push(key);
5051 values.push(value);
5052 }
5053 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5054 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5055 let out = itertools::interleave(keys, values).collect();
5056 (value_type, out)
5057 };
5058
5059 if matches!(value_type, SqlScalarType::Char { .. }) {
5060 bail_unsupported!("char map");
5061 }
5062
5063 let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5064 Ok(expr.into())
5065}
5066
5067pub fn coerce_homogeneous_exprs(
5084 ecx: &ExprContext,
5085 exprs: Vec<CoercibleScalarExpr>,
5086 force_type: Option<&SqlScalarType>,
5087) -> Result<Vec<HirScalarExpr>, PlanError> {
5088 assert!(!exprs.is_empty());
5089
5090 let target_holder;
5091 let target = match force_type {
5092 Some(t) => t,
5093 None => {
5094 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5095 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5096 &target_holder
5097 }
5098 };
5099
5100 let mut out = Vec::new();
5102 for expr in exprs {
5103 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5104 let ccx = match force_type {
5105 None => CastContext::Implicit,
5106 Some(_) => CastContext::Explicit,
5107 };
5108 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5109 Ok(expr) => out.push(expr),
5110 Err(_) => sql_bail!(
5111 "{} could not convert type {} to {}",
5112 ecx.name,
5113 ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5114 ecx.humanize_sql_scalar_type(target, false),
5115 ),
5116 }
5117 }
5118 Ok(out)
5119}
5120
5121pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5124 obe: &OrderByExpr<T>,
5125 column: usize,
5126) -> ColumnOrder {
5127 let desc = !obe.asc.unwrap_or(true);
5128 ColumnOrder {
5129 column,
5130 desc,
5131 nulls_last: obe.nulls_last.unwrap_or(!desc),
5134 }
5135}
5136
5137fn plan_function_order_by(
5145 ecx: &ExprContext,
5146 order_by: &[OrderByExpr<Aug>],
5147) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5148 let mut order_by_exprs = vec![];
5149 let mut col_orders = vec![];
5150 {
5151 for (i, obe) in order_by.iter().enumerate() {
5152 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5156 order_by_exprs.push(expr);
5157 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5158 }
5159 }
5160 Ok((order_by_exprs, col_orders))
5161}
5162
5163fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5168 scx.humanize_resolved_name(name)
5169 .map(|n| n.to_string())
5170 .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5171}
5172
5173fn plan_aggregate_common(
5175 ecx: &ExprContext,
5176 Function::<Aug> {
5177 name,
5178 args,
5179 filter,
5180 over: _,
5181 distinct,
5182 }: &Function<Aug>,
5183) -> Result<AggregateExpr, PlanError> {
5184 let impls = match resolve_func(ecx, name, args)? {
5199 Func::Aggregate(impls) => impls,
5200 _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5201 };
5202
5203 let (args, order_by) = match &args {
5212 FunctionArgs::Star => (vec![], vec![]),
5213 FunctionArgs::Args { args, order_by } => {
5214 if args.is_empty() {
5215 sql_bail!(
5216 "{}(*) must be used to call a parameterless aggregate function",
5217 humanize_or_debug(ecx.qcx.scx, name)
5218 );
5219 }
5220 let args = plan_exprs(ecx, args)?;
5221 (args, order_by.clone())
5222 }
5223 };
5224
5225 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5226
5227 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5228 if let Some(filter) = &filter {
5229 let cond =
5239 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5240 let expr_typ = ecx.scalar_type(&expr);
5241 expr = HirScalarExpr::if_then_else(
5242 cond,
5243 expr,
5244 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5245 );
5246 }
5247
5248 let mut seen_outer = false;
5249 let mut seen_inner = false;
5250 #[allow(deprecated)]
5251 expr.visit_columns(0, &mut |depth, col| {
5252 if depth == 0 && col.level == 0 {
5253 seen_inner = true;
5254 } else if col.level > depth {
5255 seen_outer = true;
5256 }
5257 });
5258 if seen_outer && !seen_inner {
5259 bail_unsupported!(
5260 3720,
5261 "aggregate functions that refer exclusively to outer columns"
5262 );
5263 }
5264
5265 if func.is_order_sensitive() {
5268 let field_names = iter::repeat(ColumnName::from(""))
5269 .take(1 + order_by_exprs.len())
5270 .collect();
5271 let mut exprs = vec![expr];
5272 exprs.extend(order_by_exprs);
5273 expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5274 }
5275
5276 Ok(AggregateExpr {
5277 func,
5278 expr: Box::new(expr),
5279 distinct: *distinct,
5280 })
5281}
5282
5283fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5284 let mut names = names.to_vec();
5285 let Some(last) = names.pop() else {
5288 bail_internal!("empty identifier");
5289 };
5290 let col_name = normalize::column_name(last);
5291
5292 if !names.is_empty() {
5294 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5295 let (i, i_name) = ecx.scope.resolve_table_column(
5296 &ecx.qcx.outer_scopes,
5297 &table_name,
5298 &col_name,
5299 &mut ecx.qcx.name_manager.borrow_mut(),
5300 )?;
5301 return Ok(HirScalarExpr::named_column(i, i_name));
5302 }
5303
5304 let similar_names = match ecx.scope.resolve_column(
5307 &ecx.qcx.outer_scopes,
5308 &col_name,
5309 &mut ecx.qcx.name_manager.borrow_mut(),
5310 ) {
5311 Ok((i, i_name)) => {
5312 return Ok(HirScalarExpr::named_column(i, i_name));
5313 }
5314 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5315 Err(e) => return Err(e),
5316 };
5317
5318 let items = ecx.scope.items_from_table(
5321 &ecx.qcx.outer_scopes,
5322 &PartialItemName {
5323 database: None,
5324 schema: None,
5325 item: col_name.as_str().to_owned(),
5326 },
5327 )?;
5328 match items.as_slice() {
5329 [] => Err(PlanError::UnknownColumn {
5331 table: None,
5332 column: col_name,
5333 similar: similar_names,
5334 }),
5335 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5340 *column,
5341 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5342 )),
5343 _ => {
5346 let mut has_exists_column = None;
5347 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5348 .into_iter()
5349 .filter_map(|(column, item)| {
5350 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5351 has_exists_column = Some(column);
5352 None
5353 } else {
5354 let expr = HirScalarExpr::named_column(
5355 column,
5356 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5357 );
5358 let name = item.column_name.clone();
5359 Some((expr, name))
5360 }
5361 })
5362 .unzip();
5363 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5365 exprs.into_element()
5366 } else {
5367 HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5368 };
5369 if let Some(has_exists_column) = has_exists_column {
5370 Ok(HirScalarExpr::if_then_else(
5371 HirScalarExpr::unnamed_column(has_exists_column)
5372 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5373 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5374 expr,
5375 ))
5376 } else {
5377 Ok(expr)
5378 }
5379 }
5380 }
5381}
5382
5383fn plan_op(
5384 ecx: &ExprContext,
5385 op: &str,
5386 expr1: &Expr<Aug>,
5387 expr2: Option<&Expr<Aug>>,
5388) -> Result<HirScalarExpr, PlanError> {
5389 let impls = func::resolve_op(op)?;
5390 let args = match expr2 {
5391 None => plan_exprs(ecx, &[expr1])?,
5392 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5393 };
5394 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5395}
5396
5397fn plan_function<'a>(
5398 ecx: &ExprContext,
5399 f @ Function {
5400 name,
5401 args,
5402 filter,
5403 over,
5404 distinct,
5405 }: &'a Function<Aug>,
5406) -> Result<HirScalarExpr, PlanError> {
5407 let impls = match resolve_func(ecx, name, args)? {
5408 Func::Table(_) => {
5409 sql_bail!(
5410 "table functions are not allowed in {} (function {})",
5411 ecx.name,
5412 name
5413 );
5414 }
5415 Func::Scalar(impls) => {
5416 if over.is_some() {
5417 sql_bail!(
5418 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5419 );
5420 }
5421 impls
5422 }
5423 Func::ScalarWindow(impls) => {
5424 let (
5425 ignore_nulls,
5426 order_by_exprs,
5427 col_orders,
5428 _window_frame,
5429 partition_by,
5430 scalar_args,
5431 ) = plan_window_function_non_aggr(ecx, f)?;
5432
5433 if !scalar_args.is_empty() {
5437 if let ResolvedItemName::Item {
5438 full_name: FullItemName { item, .. },
5439 ..
5440 } = name
5441 {
5442 sql_bail!(
5443 "function {} has 0 parameters, but was called with {}",
5444 item,
5445 scalar_args.len()
5446 );
5447 }
5448 }
5449
5450 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5455
5456 if ignore_nulls {
5457 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5460 }
5461
5462 return Ok(HirScalarExpr::windowing(WindowExpr {
5463 func: WindowExprType::Scalar(ScalarWindowExpr {
5464 func,
5465 order_by: col_orders,
5466 }),
5467 partition_by,
5468 order_by: order_by_exprs,
5469 }));
5470 }
5471 Func::ValueWindow(impls) => {
5472 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5473 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5474 window_plan;
5475
5476 let (args_encoded, func) =
5477 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5478
5479 if ignore_nulls {
5480 match func {
5481 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5482 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5483 }
5484 }
5485
5486 return Ok(HirScalarExpr::windowing(WindowExpr {
5487 func: WindowExprType::Value(ValueWindowExpr {
5488 func,
5489 args: Box::new(args_encoded),
5490 order_by: col_orders,
5491 window_frame,
5492 ignore_nulls, }),
5494 partition_by,
5495 order_by: order_by_exprs,
5496 }));
5497 }
5498 Func::Aggregate(_) => {
5499 if f.over.is_none() {
5500 if ecx.allow_aggregates {
5502 sql_bail!(
5505 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5506 name,
5507 );
5508 } else {
5509 sql_bail!(
5512 "aggregate functions are not allowed in {} (function {})",
5513 ecx.name,
5514 name
5515 );
5516 }
5517 } else {
5518 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5519 plan_window_function_common(ecx, &f.name, &f.over)?;
5520
5521 match (&window_frame.start_bound, &window_frame.end_bound) {
5523 (
5524 mz_expr::WindowFrameBound::UnboundedPreceding,
5525 mz_expr::WindowFrameBound::OffsetPreceding(..),
5526 )
5527 | (
5528 mz_expr::WindowFrameBound::UnboundedPreceding,
5529 mz_expr::WindowFrameBound::OffsetFollowing(..),
5530 )
5531 | (
5532 mz_expr::WindowFrameBound::OffsetPreceding(..),
5533 mz_expr::WindowFrameBound::UnboundedFollowing,
5534 )
5535 | (
5536 mz_expr::WindowFrameBound::OffsetFollowing(..),
5537 mz_expr::WindowFrameBound::UnboundedFollowing,
5538 ) => bail_unsupported!("mixed unbounded - offset frames"),
5539 (_, _) => {} }
5541
5542 if ignore_nulls {
5543 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5547 }
5548
5549 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5550
5551 if aggregate_expr.distinct {
5552 bail_unsupported!("DISTINCT in window aggregates");
5554 }
5555
5556 return Ok(HirScalarExpr::windowing(WindowExpr {
5557 func: WindowExprType::Aggregate(AggregateWindowExpr {
5558 aggregate_expr,
5559 order_by: col_orders,
5560 window_frame,
5561 }),
5562 partition_by,
5563 order_by: order_by_exprs,
5564 }));
5565 }
5566 }
5567 };
5568
5569 if over.is_some() {
5570 bail_internal!("OVER clause should have been handled by the window function path above");
5571 }
5572
5573 if *distinct {
5574 sql_bail!(
5575 "DISTINCT specified, but {} is not an aggregate function",
5576 humanize_or_debug(ecx.qcx.scx, name)
5577 );
5578 }
5579 if filter.is_some() {
5580 sql_bail!(
5581 "FILTER specified, but {} is not an aggregate function",
5582 humanize_or_debug(ecx.qcx.scx, name)
5583 );
5584 }
5585
5586 let scalar_args = match &args {
5587 FunctionArgs::Star => {
5588 sql_bail!(
5589 "* argument is invalid with non-aggregate function {}",
5590 humanize_or_debug(ecx.qcx.scx, name)
5591 )
5592 }
5593 FunctionArgs::Args { args, order_by } => {
5594 if !order_by.is_empty() {
5595 sql_bail!(
5596 "ORDER BY specified, but {} is not an aggregate function",
5597 humanize_or_debug(ecx.qcx.scx, name)
5598 );
5599 }
5600 plan_exprs(ecx, args)?
5601 }
5602 };
5603
5604 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5605}
5606
5607pub const IGNORE_NULLS_ERROR_MSG: &str =
5608 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5609
5610pub fn resolve_func(
5614 ecx: &ExprContext,
5615 name: &ResolvedItemName,
5616 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5617) -> Result<&'static Func, PlanError> {
5618 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5619 if let Ok(f) = i.func() {
5620 return Ok(f);
5621 }
5622 }
5623
5624 let cexprs = match args {
5627 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5628 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5629 if !order_by.is_empty() {
5630 sql_bail!(
5631 "ORDER BY specified, but {} is not an aggregate function",
5632 name
5633 );
5634 }
5635 plan_exprs(ecx, args)?
5636 }
5637 };
5638
5639 let arg_types: Vec<_> = cexprs
5640 .into_iter()
5641 .map(|ty| match ecx.scalar_type(&ty) {
5642 CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5643 CoercibleScalarType::Record(_) => "record".to_string(),
5644 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5645 })
5646 .collect();
5647
5648 Err(PlanError::UnknownFunction {
5649 name: name.to_string(),
5650 arg_types,
5651 })
5652}
5653
5654fn plan_is_expr<'a>(
5655 ecx: &ExprContext,
5656 expr: &'a Expr<Aug>,
5657 construct: &IsExprConstruct<Aug>,
5658 not: bool,
5659) -> Result<HirScalarExpr, PlanError> {
5660 let expr_hir = plan_expr(ecx, expr)?;
5661
5662 let mut result = match construct {
5663 IsExprConstruct::Null => {
5664 expr_hir.type_as_any(ecx)?.call_is_null()
5669 }
5670 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5671 IsExprConstruct::True => expr_hir
5672 .type_as(ecx, &SqlScalarType::Bool)?
5673 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5674 IsExprConstruct::False => expr_hir
5675 .type_as(ecx, &SqlScalarType::Bool)?
5676 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5677 IsExprConstruct::DistinctFrom(expr2) => {
5678 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5689 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5690
5691 let expr1_hir = expr_hir.type_as_any(ecx)?;
5692 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5693
5694 let term1 = HirScalarExpr::variadic_or(vec![
5695 ne_hir,
5696 expr1_hir.clone().call_is_null(),
5697 expr2_hir.clone().call_is_null(),
5698 ]);
5699 let term2 = HirScalarExpr::variadic_or(vec![
5700 expr1_hir.call_is_null().not(),
5701 expr2_hir.call_is_null().not(),
5702 ]);
5703 term1.and(term2)
5704 }
5705 };
5706 if not {
5707 result = result.not();
5708 }
5709 Ok(result)
5710}
5711
5712fn plan_case<'a>(
5713 ecx: &ExprContext,
5714 operand: &'a Option<Box<Expr<Aug>>>,
5715 conditions: &'a [Expr<Aug>],
5716 results: &'a [Expr<Aug>],
5717 else_result: &'a Option<Box<Expr<Aug>>>,
5718) -> Result<HirScalarExpr, PlanError> {
5719 let mut cond_exprs = Vec::new();
5720 let mut result_exprs = Vec::new();
5721 for (c, r) in conditions.iter().zip_eq(results) {
5722 let c = match operand {
5723 Some(operand) => operand.clone().equals(c.clone()),
5724 None => c.clone(),
5725 };
5726 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5727 cond_exprs.push(cexpr);
5728 result_exprs.push(r);
5729 }
5730 result_exprs.push(match else_result {
5731 Some(else_result) => else_result,
5732 None => &Expr::Value(Value::Null),
5733 });
5734 let mut result_exprs = coerce_homogeneous_exprs(
5735 &ecx.with_name("CASE"),
5736 plan_exprs(ecx, &result_exprs)?,
5737 None,
5738 )?;
5739 let mut expr = result_exprs.pop().unwrap();
5740 assert_eq!(cond_exprs.len(), result_exprs.len());
5741 for (cexpr, rexpr) in cond_exprs
5742 .into_iter()
5743 .rev()
5744 .zip_eq(result_exprs.into_iter().rev())
5745 {
5746 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5747 }
5748 Ok(expr)
5749}
5750
5751fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5752 let (datum, scalar_type) = match l {
5753 Value::Number(s) => {
5754 let d = strconv::parse_numeric(s.as_str())?;
5755 if !s.contains(&['E', '.'][..]) {
5756 if let Ok(n) = d.0.try_into() {
5758 (Datum::Int32(n), SqlScalarType::Int32)
5759 } else if let Ok(n) = d.0.try_into() {
5760 (Datum::Int64(n), SqlScalarType::Int64)
5761 } else {
5762 (
5763 Datum::Numeric(d),
5764 SqlScalarType::Numeric { max_scale: None },
5765 )
5766 }
5767 } else {
5768 (
5769 Datum::Numeric(d),
5770 SqlScalarType::Numeric { max_scale: None },
5771 )
5772 }
5773 }
5774 Value::HexString(_) => bail_unsupported!("hex string literals"),
5775 Value::Boolean(b) => match b {
5776 false => (Datum::False, SqlScalarType::Bool),
5777 true => (Datum::True, SqlScalarType::Bool),
5778 },
5779 Value::Interval(i) => {
5780 let i = literal::plan_interval(i)?;
5781 (Datum::Interval(i), SqlScalarType::Interval)
5782 }
5783 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5784 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5785 };
5786 let expr = HirScalarExpr::literal(datum, scalar_type);
5787 Ok(expr.into())
5788}
5789
5790fn plan_window_function_non_aggr<'a>(
5793 ecx: &ExprContext,
5794 Function {
5795 name,
5796 args,
5797 filter,
5798 over,
5799 distinct,
5800 }: &'a Function<Aug>,
5801) -> Result<
5802 (
5803 bool,
5804 Vec<HirScalarExpr>,
5805 Vec<ColumnOrder>,
5806 mz_expr::WindowFrame,
5807 Vec<HirScalarExpr>,
5808 Vec<CoercibleScalarExpr>,
5809 ),
5810 PlanError,
5811> {
5812 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5813 plan_window_function_common(ecx, name, over)?;
5814
5815 if *distinct {
5816 sql_bail!(
5817 "DISTINCT specified, but {} is not an aggregate function",
5818 name
5819 );
5820 }
5821
5822 if filter.is_some() {
5823 bail_unsupported!("FILTER in non-aggregate window functions");
5824 }
5825
5826 let scalar_args = match &args {
5827 FunctionArgs::Star => {
5828 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5829 }
5830 FunctionArgs::Args { args, order_by } => {
5831 if !order_by.is_empty() {
5832 sql_bail!(
5833 "ORDER BY specified, but {} is not an aggregate function",
5834 name
5835 );
5836 }
5837 plan_exprs(ecx, args)?
5838 }
5839 };
5840
5841 Ok((
5842 ignore_nulls,
5843 order_by_exprs,
5844 col_orders,
5845 window_frame,
5846 partition,
5847 scalar_args,
5848 ))
5849}
5850
5851fn plan_window_function_common(
5853 ecx: &ExprContext,
5854 name: &<Aug as AstInfo>::ItemName,
5855 over: &Option<WindowSpec<Aug>>,
5856) -> Result<
5857 (
5858 bool,
5859 Vec<HirScalarExpr>,
5860 Vec<ColumnOrder>,
5861 mz_expr::WindowFrame,
5862 Vec<HirScalarExpr>,
5863 ),
5864 PlanError,
5865> {
5866 if !ecx.allow_windows {
5867 sql_bail!(
5868 "window functions are not allowed in {} (function {})",
5869 ecx.name,
5870 name
5871 );
5872 }
5873
5874 let window_spec = match over.as_ref() {
5875 Some(over) => over,
5876 None => sql_bail!("window function {} requires an OVER clause", name),
5877 };
5878 if window_spec.ignore_nulls && window_spec.respect_nulls {
5879 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5880 }
5881 let window_frame = match window_spec.window_frame.as_ref() {
5882 Some(frame) => plan_window_frame(frame)?,
5883 None => mz_expr::WindowFrame::default(),
5884 };
5885 let mut partition = Vec::new();
5886 for expr in &window_spec.partition_by {
5887 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5888 }
5889
5890 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5891
5892 Ok((
5893 window_spec.ignore_nulls,
5894 order_by_exprs,
5895 col_orders,
5896 window_frame,
5897 partition,
5898 ))
5899}
5900
5901fn plan_window_frame(
5902 WindowFrame {
5903 units,
5904 start_bound,
5905 end_bound,
5906 }: &WindowFrame,
5907) -> Result<mz_expr::WindowFrame, PlanError> {
5908 use mz_expr::WindowFrameBound::*;
5909 let units = window_frame_unit_ast_to_expr(units)?;
5910 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5911 let end_bound = end_bound
5912 .as_ref()
5913 .map(window_frame_bound_ast_to_expr)
5914 .unwrap_or(CurrentRow);
5915
5916 match (&start_bound, &end_bound) {
5918 (UnboundedFollowing, _) => {
5920 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5921 }
5922 (_, UnboundedPreceding) => {
5924 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5925 }
5926 (CurrentRow, OffsetPreceding(_)) => {
5928 sql_bail!("frame starting from current row cannot have preceding rows")
5929 }
5930 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5931 sql_bail!("frame starting from following row cannot have preceding rows")
5932 }
5933 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5936 if *o1 > 1000000 || *o2 > 1000000 {
5940 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5941 }
5942 }
5943 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5944 if *o1 > 1000000 || *o2 > 1000000 {
5945 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5946 }
5947 }
5948 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5949 if *o1 > 1000000 || *o2 > 1000000 {
5950 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5951 }
5952 }
5953 (OffsetPreceding(o), CurrentRow) => {
5954 if *o > 1000000 {
5955 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5956 }
5957 }
5958 (CurrentRow, OffsetFollowing(o)) => {
5959 if *o > 1000000 {
5960 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5961 }
5962 }
5963 (_, _) => (),
5965 }
5966
5967 if units == mz_expr::WindowFrameUnits::Range
5970 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5971 {
5972 bail_unsupported!("RANGE in non-default window frames")
5973 }
5974
5975 let frame = mz_expr::WindowFrame {
5976 units,
5977 start_bound,
5978 end_bound,
5979 };
5980 Ok(frame)
5981}
5982
5983fn window_frame_unit_ast_to_expr(
5984 unit: &WindowFrameUnits,
5985) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5986 match unit {
5987 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5988 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5989 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5990 }
5991}
5992
5993fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5994 match bound {
5995 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5996 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5997 WindowFrameBound::Preceding(Some(offset)) => {
5998 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5999 }
6000 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6001 WindowFrameBound::Following(Some(offset)) => {
6002 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6003 }
6004 }
6005}
6006
6007pub fn scalar_type_from_sql(
6008 scx: &StatementContext,
6009 data_type: &ResolvedDataType,
6010) -> Result<SqlScalarType, PlanError> {
6011 match data_type {
6012 ResolvedDataType::AnonymousList(elem_type) => {
6013 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6014 if matches!(elem_type, SqlScalarType::Char { .. }) {
6015 bail_unsupported!("char list");
6016 }
6017 Ok(SqlScalarType::List {
6018 element_type: Box::new(elem_type),
6019 custom_id: None,
6020 })
6021 }
6022 ResolvedDataType::AnonymousMap {
6023 key_type,
6024 value_type,
6025 } => {
6026 match scalar_type_from_sql(scx, key_type)? {
6027 SqlScalarType::String => {}
6028 other => sql_bail!(
6029 "map key type must be {}, got {}",
6030 scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6031 scx.humanize_sql_scalar_type(&other, false)
6032 ),
6033 }
6034 Ok(SqlScalarType::Map {
6035 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6036 custom_id: None,
6037 })
6038 }
6039 ResolvedDataType::Named { id, modifiers, .. } => {
6040 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6041 }
6042 ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6043 }
6044}
6045
6046pub fn scalar_type_from_catalog(
6047 catalog: &dyn SessionCatalog,
6048 id: CatalogItemId,
6049 modifiers: &[i64],
6050) -> Result<SqlScalarType, PlanError> {
6051 let entry = catalog.get_item(&id);
6052 let type_details = match entry.type_details() {
6053 Some(type_details) => type_details,
6054 None => {
6055 sql_bail!(
6058 "internal error: {} does not refer to a type",
6059 catalog.resolve_full_name(entry.name()).to_string().quoted()
6060 );
6061 }
6062 };
6063 match &type_details.typ {
6064 CatalogType::Numeric => {
6065 let mut modifiers = modifiers.iter().fuse();
6066 let precision = match modifiers.next() {
6067 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6068 sql_bail!(
6069 "precision for type numeric must be between 1 and {}",
6070 NUMERIC_DATUM_MAX_PRECISION,
6071 );
6072 }
6073 Some(p) => Some(*p),
6074 None => None,
6075 };
6076 let scale = match modifiers.next() {
6077 Some(scale) => {
6078 if let Some(precision) = precision {
6079 if *scale > precision {
6080 sql_bail!(
6081 "scale for type numeric must be between 0 and precision {}",
6082 precision
6083 );
6084 }
6085 }
6086 Some(NumericMaxScale::try_from(*scale)?)
6087 }
6088 None => None,
6089 };
6090 if modifiers.next().is_some() {
6091 sql_bail!("type numeric supports at most two type modifiers");
6092 }
6093 Ok(SqlScalarType::Numeric { max_scale: scale })
6094 }
6095 CatalogType::Char => {
6096 let mut modifiers = modifiers.iter().fuse();
6097 let length = match modifiers.next() {
6098 Some(l) => Some(CharLength::try_from(*l)?),
6099 None => Some(CharLength::ONE),
6100 };
6101 if modifiers.next().is_some() {
6102 sql_bail!("type character supports at most one type modifier");
6103 }
6104 Ok(SqlScalarType::Char { length })
6105 }
6106 CatalogType::VarChar => {
6107 let mut modifiers = modifiers.iter().fuse();
6108 let length = match modifiers.next() {
6109 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6110 None => None,
6111 };
6112 if modifiers.next().is_some() {
6113 sql_bail!("type character varying supports at most one type modifier");
6114 }
6115 Ok(SqlScalarType::VarChar { max_length: length })
6116 }
6117 CatalogType::Timestamp => {
6118 let mut modifiers = modifiers.iter().fuse();
6119 let precision = match modifiers.next() {
6120 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6121 None => None,
6122 };
6123 if modifiers.next().is_some() {
6124 sql_bail!("type timestamp supports at most one type modifier");
6125 }
6126 Ok(SqlScalarType::Timestamp { precision })
6127 }
6128 CatalogType::TimestampTz => {
6129 let mut modifiers = modifiers.iter().fuse();
6130 let precision = match modifiers.next() {
6131 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6132 None => None,
6133 };
6134 if modifiers.next().is_some() {
6135 sql_bail!("type timestamp with time zone supports at most one type modifier");
6136 }
6137 Ok(SqlScalarType::TimestampTz { precision })
6138 }
6139 t => {
6140 if !modifiers.is_empty() {
6141 sql_bail!(
6142 "{} does not support type modifiers",
6143 catalog.resolve_full_name(entry.name()).to_string()
6144 );
6145 }
6146 match t {
6147 CatalogType::Array {
6148 element_reference: element_id,
6149 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6150 catalog,
6151 *element_id,
6152 modifiers,
6153 )?))),
6154 CatalogType::List {
6155 element_reference: element_id,
6156 element_modifiers,
6157 } => Ok(SqlScalarType::List {
6158 element_type: Box::new(scalar_type_from_catalog(
6159 catalog,
6160 *element_id,
6161 element_modifiers,
6162 )?),
6163 custom_id: Some(id),
6164 }),
6165 CatalogType::Map {
6166 key_reference: _,
6167 key_modifiers: _,
6168 value_reference: value_id,
6169 value_modifiers,
6170 } => Ok(SqlScalarType::Map {
6171 value_type: Box::new(scalar_type_from_catalog(
6172 catalog,
6173 *value_id,
6174 value_modifiers,
6175 )?),
6176 custom_id: Some(id),
6177 }),
6178 CatalogType::Range {
6179 element_reference: element_id,
6180 } => Ok(SqlScalarType::Range {
6181 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6182 }),
6183 CatalogType::Record { fields } => {
6184 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6185 .iter()
6186 .map(|f| {
6187 let scalar_type = scalar_type_from_catalog(
6188 catalog,
6189 f.type_reference,
6190 &f.type_modifiers,
6191 )?;
6192 Ok((
6193 f.name.clone(),
6194 SqlColumnType {
6195 scalar_type,
6196 nullable: true,
6197 },
6198 ))
6199 })
6200 .collect::<Result<Box<_>, PlanError>>()?;
6201 Ok(SqlScalarType::Record {
6202 fields: scalars,
6203 custom_id: Some(id),
6204 })
6205 }
6206 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6207 CatalogType::Bool => Ok(SqlScalarType::Bool),
6208 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6209 CatalogType::Date => Ok(SqlScalarType::Date),
6210 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6211 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6212 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6213 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6214 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6215 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6216 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6217 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6218 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6219 CatalogType::Interval => Ok(SqlScalarType::Interval),
6220 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6221 CatalogType::Oid => Ok(SqlScalarType::Oid),
6222 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6223 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6224 CatalogType::Pseudo => {
6225 sql_bail!(
6226 "cannot reference pseudo type {}",
6227 catalog.resolve_full_name(entry.name()).to_string()
6228 )
6229 }
6230 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6231 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6232 CatalogType::RegType => Ok(SqlScalarType::RegType),
6233 CatalogType::String => Ok(SqlScalarType::String),
6234 CatalogType::Time => Ok(SqlScalarType::Time),
6235 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6236 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6237 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6238 CatalogType::Numeric => unreachable!("handled above"),
6239 CatalogType::Char => unreachable!("handled above"),
6240 CatalogType::VarChar => unreachable!("handled above"),
6241 CatalogType::Timestamp => unreachable!("handled above"),
6242 CatalogType::TimestampTz => unreachable!("handled above"),
6243 }
6244 }
6245 }
6246}
6247
6248struct AggregateTableFuncVisitor<'a> {
6251 scx: &'a StatementContext<'a>,
6252 aggs: Vec<Function<Aug>>,
6253 within_aggregate: bool,
6254 tables: BTreeMap<Function<Aug>, String>,
6255 table_disallowed_context: Vec<&'static str>,
6256 in_select_item: bool,
6257 id_gen: IdGen,
6258 err: Option<PlanError>,
6259}
6260
6261impl<'a> AggregateTableFuncVisitor<'a> {
6262 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6263 AggregateTableFuncVisitor {
6264 scx,
6265 aggs: Vec::new(),
6266 within_aggregate: false,
6267 tables: BTreeMap::new(),
6268 table_disallowed_context: Vec::new(),
6269 in_select_item: false,
6270 id_gen: Default::default(),
6271 err: None,
6272 }
6273 }
6274
6275 fn into_result(
6276 self,
6277 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6278 match self.err {
6279 Some(err) => Err(err),
6280 None => {
6281 let mut seen = BTreeSet::new();
6284 let aggs = self
6285 .aggs
6286 .into_iter()
6287 .filter(move |agg| seen.insert(agg.clone()))
6288 .collect();
6289 Ok((aggs, self.tables))
6290 }
6291 }
6292 }
6293}
6294
6295impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6296 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6297 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6298 Ok(i) => i,
6299 Err(_) => return,
6301 };
6302
6303 match item.func() {
6304 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6307 if self.within_aggregate {
6308 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6309 return;
6310 }
6311 self.aggs.push(func.clone());
6312 let Function {
6313 name: _,
6314 args,
6315 filter,
6316 over: _,
6317 distinct: _,
6318 } = func;
6319 if let Some(filter) = filter {
6320 self.visit_expr_mut(filter);
6321 }
6322 let old_within_aggregate = self.within_aggregate;
6323 self.within_aggregate = true;
6324 self.table_disallowed_context
6325 .push("aggregate function calls");
6326
6327 self.visit_function_args_mut(args);
6328
6329 self.within_aggregate = old_within_aggregate;
6330 self.table_disallowed_context.pop();
6331 }
6332 Ok(Func::Table { .. }) => {
6333 self.table_disallowed_context.push("other table functions");
6334 visit_mut::visit_function_mut(self, func);
6335 self.table_disallowed_context.pop();
6336 }
6337 _ => visit_mut::visit_function_mut(self, func),
6338 }
6339 }
6340
6341 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6342 }
6344
6345 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6346 let (disallowed_context, func) = match expr {
6347 Expr::Case { .. } => (Some("CASE"), None),
6348 Expr::HomogenizingFunction {
6349 function: HomogenizingFunction::Coalesce,
6350 ..
6351 } => (Some("COALESCE"), None),
6352 Expr::Function(func) if self.in_select_item => {
6353 let mut table_func = None;
6356 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6357 if let Ok(Func::Table { .. }) = item.func() {
6358 if let Some(context) = self.table_disallowed_context.last() {
6359 self.err = Some(sql_err!(
6360 "table functions are not allowed in {} (function {})",
6361 context,
6362 func.name
6363 ));
6364 return;
6365 }
6366 table_func = Some(func.clone());
6367 }
6368 }
6369 (None, table_func)
6372 }
6373 _ => (None, None),
6374 };
6375 if let Some(func) = func {
6376 visit_mut::visit_expr_mut(self, expr);
6378 if let Function {
6380 name: _,
6381 args: _,
6382 filter: None,
6383 over: None,
6384 distinct: false,
6385 } = &func
6386 {
6387 let unique_id = self.id_gen.allocate_id();
6389 let id = self
6390 .tables
6391 .entry(func)
6392 .or_insert_with(|| format!("table_func_{unique_id}"));
6393 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6396 }
6397 }
6398 if let Some(context) = disallowed_context {
6399 self.table_disallowed_context.push(context);
6400 }
6401
6402 visit_mut::visit_expr_mut(self, expr);
6403
6404 if disallowed_context.is_some() {
6405 self.table_disallowed_context.pop();
6406 }
6407 }
6408
6409 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6410 let old = self.in_select_item;
6411 self.in_select_item = true;
6412 visit_mut::visit_select_item_mut(self, si);
6413 self.in_select_item = old;
6414 }
6415}
6416
6417#[derive(Default)]
6418struct WindowFuncCollector {
6419 window_funcs: Vec<Expr<Aug>>,
6420}
6421
6422impl WindowFuncCollector {
6423 fn into_result(self) -> Vec<Expr<Aug>> {
6424 let mut seen = BTreeSet::new();
6426 let window_funcs_dedupped = self
6427 .window_funcs
6428 .into_iter()
6429 .filter(move |expr| seen.insert(expr.clone()))
6430 .rev()
6433 .collect();
6434 window_funcs_dedupped
6435 }
6436}
6437
6438impl Visit<'_, Aug> for WindowFuncCollector {
6439 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6440 match expr {
6441 Expr::Function(func) => {
6442 if func.over.is_some() {
6443 self.window_funcs.push(expr.clone());
6444 }
6445 }
6446 _ => (),
6447 }
6448 visit::visit_expr(self, expr);
6449 }
6450
6451 fn visit_query(&mut self, _query: &Query<Aug>) {
6452 }
6454}
6455
6456#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6458pub enum QueryLifetime {
6459 OneShot,
6461 Index,
6463 MaterializedView,
6465 Subscribe,
6467 View,
6469 Source,
6471}
6472
6473impl QueryLifetime {
6474 pub fn is_one_shot(&self) -> bool {
6478 let result = match self {
6479 QueryLifetime::OneShot => true,
6480 QueryLifetime::Index => false,
6481 QueryLifetime::MaterializedView => false,
6482 QueryLifetime::Subscribe => false,
6483 QueryLifetime::View => false,
6484 QueryLifetime::Source => false,
6485 };
6486 assert_eq!(!result, self.is_maintained());
6487 result
6488 }
6489
6490 pub fn is_maintained(&self) -> bool {
6493 match self {
6494 QueryLifetime::OneShot => false,
6495 QueryLifetime::Index => true,
6496 QueryLifetime::MaterializedView => true,
6497 QueryLifetime::Subscribe => true,
6498 QueryLifetime::View => true,
6499 QueryLifetime::Source => true,
6500 }
6501 }
6502
6503 pub fn allow_show(&self) -> bool {
6505 match self {
6506 QueryLifetime::OneShot => true,
6507 QueryLifetime::Index => false,
6508 QueryLifetime::MaterializedView => false,
6509 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6511 QueryLifetime::Source => false,
6512 }
6513 }
6514}
6515
6516#[derive(Debug, Clone)]
6518pub struct CteDesc {
6519 pub name: String,
6520 pub desc: RelationDesc,
6521}
6522
6523#[derive(Debug, Clone)]
6525pub struct QueryContext<'a> {
6526 pub scx: &'a StatementContext<'a>,
6528 pub lifetime: QueryLifetime,
6530 pub outer_scopes: Vec<Scope>,
6532 pub outer_relation_types: Vec<SqlRelationType>,
6534 pub ctes: BTreeMap<LocalId, CteDesc>,
6536 pub name_manager: Rc<RefCell<NameManager>>,
6538 pub recursion_guard: RecursionGuard,
6539}
6540
6541impl CheckedRecursion for QueryContext<'_> {
6542 fn recursion_guard(&self) -> &RecursionGuard {
6543 &self.recursion_guard
6544 }
6545}
6546
6547impl<'a> QueryContext<'a> {
6548 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6549 QueryContext {
6550 scx,
6551 lifetime,
6552 outer_scopes: vec![],
6553 outer_relation_types: vec![],
6554 ctes: BTreeMap::new(),
6555 name_manager: Rc::new(RefCell::new(NameManager::new())),
6556 recursion_guard: RecursionGuard::with_limit(1024), }
6558 }
6559
6560 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6561 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6562 }
6563
6564 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6567 let ctes = self.ctes.clone();
6568 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6569 let outer_relation_types = iter::once(relation_type)
6570 .chain(self.outer_relation_types.clone())
6571 .collect();
6572 let name_manager = Rc::clone(&self.name_manager);
6574
6575 QueryContext {
6576 scx: self.scx,
6577 lifetime: self.lifetime,
6578 outer_scopes,
6579 outer_relation_types,
6580 ctes,
6581 name_manager,
6582 recursion_guard: self.recursion_guard.clone(),
6583 }
6584 }
6585
6586 fn empty_derived_context(&self) -> QueryContext<'a> {
6588 let scope = Scope::empty();
6589 let ty = SqlRelationType::empty();
6590 self.derived_context(scope, ty)
6591 }
6592
6593 pub fn resolve_table_name(
6596 &self,
6597 object: ResolvedItemName,
6598 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6599 match object {
6600 ResolvedItemName::Item {
6601 id,
6602 full_name,
6603 version,
6604 ..
6605 } => {
6606 let item = self.scx.get_item(&id).at_version(version);
6607 let desc = match item.relation_desc() {
6608 Some(desc) => desc.clone(),
6609 None => {
6610 return Err(PlanError::InvalidDependency {
6611 name: full_name.to_string(),
6612 item_type: item.item_type().to_string(),
6613 });
6614 }
6615 };
6616 let expr = HirRelationExpr::Get {
6617 id: Id::Global(item.global_id()),
6618 typ: desc.typ().clone(),
6619 };
6620
6621 let name = full_name.into();
6622 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6623
6624 Ok((expr, scope))
6625 }
6626 ResolvedItemName::Cte { id, name } => {
6627 let name = name.into();
6628 let cte = self.ctes.get(&id).unwrap();
6629 let expr = HirRelationExpr::Get {
6630 id: Id::Local(id),
6631 typ: cte.desc.typ().clone(),
6632 };
6633
6634 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6635
6636 Ok((expr, scope))
6637 }
6638 ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6639 }
6640 }
6641
6642 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6645 self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6646 }
6647}
6648
6649#[derive(Debug, Clone)]
6651pub struct ExprContext<'a> {
6652 pub qcx: &'a QueryContext<'a>,
6653 pub name: &'a str,
6655 pub scope: &'a Scope,
6658 pub relation_type: &'a SqlRelationType,
6661 pub allow_aggregates: bool,
6663 pub allow_subqueries: bool,
6665 pub allow_parameters: bool,
6667 pub allow_windows: bool,
6669}
6670
6671impl CheckedRecursion for ExprContext<'_> {
6672 fn recursion_guard(&self) -> &RecursionGuard {
6673 &self.qcx.recursion_guard
6674 }
6675}
6676
6677impl<'a> ExprContext<'a> {
6678 pub fn catalog(&self) -> &dyn SessionCatalog {
6679 self.qcx.scx.catalog
6680 }
6681
6682 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6683 let mut ecx = self.clone();
6684 ecx.name = name;
6685 ecx
6686 }
6687
6688 pub fn column_type<E>(&self, expr: &E) -> E::Type
6689 where
6690 E: AbstractExpr,
6691 {
6692 expr.typ(
6693 &self.qcx.outer_relation_types,
6694 self.relation_type,
6695 &self.qcx.scx.param_types.borrow(),
6696 )
6697 }
6698
6699 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6700 where
6701 E: AbstractExpr,
6702 {
6703 self.column_type(expr).scalar_type()
6704 }
6705
6706 fn derived_query_context(&self) -> QueryContext<'_> {
6707 let mut scope = self.scope.clone();
6708 scope.lateral_barrier = true;
6709 self.qcx.derived_context(scope, self.relation_type.clone())
6710 }
6711
6712 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6713 self.qcx.scx.require_feature_flag(flag)
6714 }
6715
6716 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6717 &self.qcx.scx.param_types
6718 }
6719
6720 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6723 self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6724 }
6725
6726 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6727 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6728 }
6729}
6730
6731#[derive(Debug, Clone)]
6737pub struct NameManager(BTreeSet<Arc<str>>);
6738
6739impl NameManager {
6740 pub fn new() -> Self {
6742 Self(BTreeSet::new())
6743 }
6744
6745 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6748 let s = s.as_ref();
6749 if let Some(interned) = self.0.get(s) {
6750 Arc::clone(interned)
6751 } else {
6752 let interned: Arc<str> = Arc::from(s);
6753 self.0.insert(Arc::clone(&interned));
6754 interned
6755 }
6756 }
6757
6758 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6761 self.intern(item.column_name.as_str())
6777 }
6778}
6779
6780#[cfg(test)]
6781mod test {
6782 use super::*;
6783
6784 #[mz_ore::test]
6789 pub fn test_name_manager_string_interning() {
6790 let mut nm = NameManager::new();
6791
6792 let orig_hi = "hi";
6793 let hi = nm.intern(orig_hi);
6794 let hello = nm.intern("hello");
6795
6796 assert_ne!(hi.as_ptr(), hello.as_ptr());
6797
6798 let hi2 = nm.intern("hi");
6800 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6801
6802 let s = format!(
6804 "{}{}",
6805 hi.chars().nth(0).unwrap(),
6806 hi2.chars().nth(1).unwrap()
6807 );
6808 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6810
6811 let hi3 = nm.intern(s);
6812 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6813 }
6814}